Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

executable file 5166 lines (4829 sloc) 154.887 kb
#!/usr/bin/perl -w
# Copyright (C) 2007,2008,2009,2010,2011,2012 Ole Tange and Free Software
# Foundation, Inc.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>
# or write to the Free Software Foundation, Inc., 51 Franklin St,
# Fifth Floor, Boston, MA 02110-1301 USA
# open3 used in Job::start
use IPC::Open3;
# &WNOHANG used in reaper
use POSIX qw(:sys_wait_h setsid ceil :errno_h);
# gensym used in Job::start
use Symbol qw(gensym);
# tempfile used in Job::start
use File::Temp qw(tempfile tempdir);
# GetOptions used in get_options_from_array
use Getopt::Long;
# Used to ensure code quality
use strict;
$::oodebug=0;
$SIG{TERM} ||= sub { exit 0; }; # $SIG{TERM} is not set on Mac OS X
if(not $ENV{SHELL}) {
# $ENV{SHELL} is sometimes not set on Mac OS X
print STDERR ("parallel: Warning: \$SHELL not set. Using /bin/sh\n");
$ENV{SHELL} = "/bin/sh";
}
%Global::original_sig = %SIG;
$SIG{TERM} = sub {}; # Dummy until jobs really start
open $Global::original_stderr, ">&STDERR" or ::die_bug("Can't dup STDERR: $!");
do_not_reap();
parse_options();
my $number_of_args;
if($Global::max_number_of_args) {
$number_of_args=$Global::max_number_of_args;
} elsif ($::opt_X or $::opt_m) {
$number_of_args = undef;
} else {
$number_of_args = 1;
}
my $command = "";
if(@ARGV) {
if($Global::quoting) {
$command = shell_quote(@ARGV);
} else {
$command = join(" ", @ARGV);
}
}
my @fhlist;
@fhlist = map { open_or_exit($_) } @::opt_a;
if(not @fhlist) {
@fhlist = (*STDIN);
}
if($::opt_skip_first_line) {
# Skip the first line for the first file handle
my $fh = $fhlist[0];
<$fh>;
}
if($::opt_header and not $::opt_pipe) {
my $fh = $fhlist[0];
# split with colsep or \t
# TODO should $header force $colsep = \t if undef?
my $delimiter = $::opt_colsep;
my $id = 1;
for my $fh (@fhlist) {
my $line = <$fh>;
chomp($line);
::debug("Delimiter: '$delimiter'");
for my $s (split /$delimiter/o, $line) {
::debug("Colname: '$s'");
$command =~ s:\{$s(|/|//|\.|/\.)\}:\{$id$1\}:g;
$id++;
}
}
}
if($::opt_nonall or $::opt_onall) {
# Copy all @fhlist into tempfiles
my @argfiles = ();
for my $fh (@fhlist) {
my ($outfh,$name) = ::tempfile(SUFFIX => ".all");
print $outfh (<$fh>);
close $outfh;
push @argfiles, $name;
}
if(@::opt_basefile) { setup_basefile(); }
# for each sshlogin do:
# parallel -S $sshlogin $command :::: @argfiles
#
# Pass some of the options to the sub-parallels, not all of them as
# -P should only go to the first, and -S should not be copied at all.
my $options =
join(" ",
((defined $::opt_P) ? "-P $::opt_P" : ""),
((defined $::opt_u) ? "-u" : ""),
((defined $::opt_group) ? "-g" : ""),
((defined $::opt_D) ? "-D" : ""),
);
my $suboptions =
join(" ",
((defined $::opt_u) ? "-u" : ""),
((defined $::opt_group) ? "-g" : ""),
((defined $::opt_colsep) ? "--colsep ".shell_quote($::opt_colsep) : ""),
((defined @::opt_v) ? "-vv" : ""),
((defined $::opt_D) ? "-D" : ""),
((defined $::opt_timeout) ? "--timeout ".$::opt_timeout : ""),
);
::debug("| parallel");
open(PARALLEL,"| $0 $options") ||
::die_bug("This does not run GNU Parallel: $0 $options");
for my $sshlogin (values %Global::host) {
print PARALLEL "$0 $suboptions -j1 ".
((defined $::opt_tag) ?
"--tagstring ".shell_quote_scalar($sshlogin->string()) : "").
" -S ". shell_quote_scalar($sshlogin->string())." ".
shell_quote_scalar($command)." :::: @argfiles\n";
}
close PARALLEL;
$Global::exitstatus = $? >> 8;
debug("--onall exitvalue ",$?);
if(@::opt_basefile) { cleanup_basefile(); }
unlink(@argfiles);
wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
}
$Global::JobQueue = JobQueue->new(
$command,\@fhlist,$Global::ContextReplace,$number_of_args,\@Global::ret_files);
if($::opt_eta) {
# Count the number of jobs before starting any
$Global::JobQueue->total_jobs();
}
for my $sshlogin (values %Global::host) {
$sshlogin->max_jobs_running();
}
init_run_jobs();
my $sem;
if($Global::semaphore) {
$sem = acquire_semaphore();
}
$SIG{TERM} = \&start_no_new_jobs;
start_more_jobs();
if($::opt_pipe) {
spreadstdin(@fhlist);
}
reap_if_needed();
::debug("Start draining\n");
drain_job_queue();
::debug("Done draining\n");
cleanup();
if($Global::semaphore) {
$sem->release();
}
if($::opt_halt_on_error) {
wait_and_exit($Global::halt_on_error_exitstatus);
} else {
wait_and_exit(min(undef_as_zero($Global::exitstatus),254));
}
sub __PIPE_MODE__ {}
sub spreadstdin {
# read a record
# Spawn a job and print the record to it.
my @fhlist = @_; # Filehandles to read from (Defaults to STDIN)
my $record;
my $buf = "";
my $header = "";
if($::opt_header) {
my $non_greedy_regexp = $::opt_header;
# ? , * , + , {} => ?? , *? , +? , {}?
$non_greedy_regexp =~ s/(\?|\*|\+|\})/$1\?/g;
while(read(STDIN,substr($buf,length $buf,0),$::opt_blocksize)) {
if($buf=~s/^(.*?$non_greedy_regexp)//) {
$header = $1; last;
}
}
}
my ($recstart,$recend,$recerror);
if(defined($::opt_recstart) and defined($::opt_recend)) {
# If both --recstart and --recend is given then both must match
$recstart = $::opt_recstart;
$recend = $::opt_recend;
$recerror = "parallel: Warning: --recend and --recstart unmatched. Is --blocksize too small?";
} elsif(defined($::opt_recstart)) {
# If --recstart is given it must match start of record
$recstart = $::opt_recstart;
$recend = "";
$recerror = "parallel: Warning: --recstart unmatched. Is --blocksize too small?";
} elsif(defined($::opt_recend)) {
# If --recend is given then it must match end of record
$recstart = "";
$recend = $::opt_recend;
$recerror = "parallel: Warning: --recend unmatched. Is --blocksize too small?";
}
if($::opt_regexp) {
# If $recstart/$recend contains '|' this should only apply to the regexp
$recstart = "(?:".$recstart.")";
$recend = "(?:".$recend.")";
} else {
# $recstart/$recend = printf strings (\n)
$recstart =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee;
$recend =~ s/\\([rnt'"\\])/"qq|\\$1|"/gee;
}
my $recendrecstart = $recend.$recstart;
# Force the while-loop once if everything was read by header reading
my $force_one_time_through = 0;
for my $in (@fhlist) {
while(!$force_one_time_through++ or read($in,substr($buf,length $buf,0),$::opt_blocksize)) {
# substr above = append to $buf
reap_if_needed(); # Re-enable reaping after read() (Bug#33352)
if($::opt_r) {
# Remove empty lines
$buf=~s/^\s*\n//gm;
if(length $buf == 0) {
next;
}
}
if($::opt_regexp) {
if($Global::max_number_of_args) {
# -N => (start..*?end){n}
while($buf =~ s/((?:$recstart.*?$recend){$Global::max_number_of_args})($recstart.*)$/$2/os) {
$record = $header.$1;
::debug("Read record -N: ".length($record)."\n");
write_record_to_pipe(\$record,$recstart,$recend);
}
} else {
# Find the last recend-recstart in $buf
if($buf =~ s/(.*$recend)($recstart.*?)$/$2/os) {
$record = $header.$1;
::debug("Matched record: ".length($record)."/".length($buf)."\n");
write_record_to_pipe(\$record,$recstart,$recend);
}
}
} else {
if($Global::max_number_of_args) {
# -N => (start..*?end){n}
my $i = 0;
while(($i = nindex(\$buf,$recendrecstart,$Global::max_number_of_args)) != -1) {
$i += length $recend; # find the actual splitting location
my $record = $header.substr($buf,0,$i);
substr($buf,0,$i) = "";
::debug("Read record: ".length($record)."\n");
write_record_to_pipe(\$record,$recstart,$recend);
}
} else {
# Find the last recend-recstart in $buf
my $i = rindex($buf,$recendrecstart);
if($i != -1) {
$i += length $recend; # find the actual splitting location
my $record = $header.substr($buf,0,$i);
substr($buf,0,$i) = "";
::debug("Read record: ".length($record)."\n");
write_record_to_pipe(\$record,$recstart,$recend);
}
}
}
do_not_reap(); # Disable reaping before read($in) (Bug#33352)
}
}
# If there is anything left in the buffer write it
substr($buf,0,0) = $header;
write_record_to_pipe(\$buf,$recstart,$recend);
::debug("Done reading input\n");
flush_and_close_pipes();
::debug("Done flushing to children\n");
$Global::start_no_new_jobs = 1;
}
sub nindex {
# See if string is in buffer N times
# Returns:
# the position where the Nth copy is found
my $buf_ref = shift;
my $str = shift;
my $n = shift;
my $i = 0;
for(1..$n) {
$i = index($$buf_ref,$str,$i+1);
if($i == -1) { last }
}
return $i;
}
sub flush_and_close_pipes {
# Flush that that is cached to the open pipes
# and close them.
my $flush_done;
my $sleep = 0.05;
do {
$flush_done = 1;
# Make sure everything is written to the jobs
do_not_reap();
for my $job (values %Global::running) {
if($job->remaining()) {
if($job->complete_write()) {
# Some data was written - reset sleep timer
$sleep = 0.05;
}
$flush_done = 0;
}
}
reap_if_needed();
usleep($sleep);
$sleep *= 1.1; # exponential back off
} while (not $flush_done);
do_not_reap();
for my $job (values %Global::running) {
my $fh = $job->stdin();
close $fh;
}
reap_if_needed();
}
sub write_record_to_pipe {
my $record_ref = shift;
my $recstart = shift;
my $recend = shift;
if(length $$record_ref == 0) { return; }
if($::opt_remove_rec_sep) {
# Remove record separator
$$record_ref =~ s/$recend$recstart//gos;
$$record_ref =~ s/^$recstart//os;
$$record_ref =~ s/$recend$//os;
}
# Keep the pipes hot, but if nothing happens sleep should back off
my $sleep = 0.00001; # 0.00001 ms - better performance on highend
write_record: while(1) {
# Sorting according to sequence is necessary for -k to work
do_not_reap(); # If Global::running is changed the for loop has a race condition
for my $job (sort { $a->seq() <=> $b->seq() } values %Global::running) {
::debug("Looking at ",$job->seq(),"-",$job->remaining(),"-",$job->datawritten(),"\n");
if($job->remaining()) {
# Part of the job's last record has not finished being written
if($job->complete_write()) {
# Something got written - reset sleep timer
$sleep = 0.00001;
}
} else {
if($job->datawritten() > 0) {
# There is no data remaining and we have written data before:
# So this means we have completed writing a block.
# close stdin
# This will cause the job to finish and when it dies we will spawn another job
my $fh = $job->stdin();
close $fh;
} else {
$job->write($record_ref);
# Something got written - reset sleep timer
$sleep = 0.00001;
last write_record;
}
}
}
# Force reaping as sigchild is sometimes forgotten;
if(not reaper()) {
usleep($sleep);
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep); # exponential back off
}
}
reap_if_needed();
return;
}
sub __SEM_MODE__ {}
sub acquire_semaphore {
# Acquires semaphore. If needed: spawns to the background
# Returns:
# The semaphore to be released when jobs is complete
$Global::host{':'} = SSHLogin->new(":");
my $sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running());
$sem->acquire();
debug("run");
if($Semaphore::fg) {
# skip
} else {
# If run in the background, the PID will change
# therefore release and re-acquire the semaphore
$sem->release();
if(fork()) {
exit(0);
} else {
# child
# Get a semaphore for this pid
::die_bug("Can't start a new session: $!") if setsid() == -1;
$sem = Semaphore->new($Semaphore::name,$Global::host{':'}->max_jobs_running());
$sem->acquire();
}
}
return $sem;
}
sub __PARSE_OPTIONS__ {}
sub options_hash {
# Returns a hash of the GetOptions config
return
("debug|D" => \$::opt_D,
"xargs" => \$::opt_xargs,
"m" => \$::opt_m,
"X" => \$::opt_X,
"v" => \@::opt_v,
"joblog=s" => \$::opt_joblog,
"resume" => \$::opt_resume,
"silent" => \$::opt_silent,
"keep-order|keeporder|k" => \$::opt_k,
"group" => \$::opt_group,
"g" => \$::opt_retired,
"ungroup|u" => \$::opt_u,
"null|0" => \$::opt_0,
"quote|q" => \$::opt_q,
"I=s" => \$::opt_I,
"extensionreplace|er" => \$::opt_U,
"U=s" => \$::opt_retired,
"basenamereplace|bnr=s" => \$::opt_basenamereplace,
"dirnamereplace|dnr=s" => \$::opt_dirnamereplace,
"basenameextensionreplace|bner=s" => \$::opt_basenameextensionreplace,
"seqreplace=s" => \$::opt_seqreplace,
"jobs|j=s" => \$::opt_P,
"load=s" => \$::opt_load,
"noswap" => \$::opt_noswap,
"max-line-length-allowed" => \$::opt_max_line_length_allowed,
"number-of-cpus" => \$::opt_number_of_cpus,
"number-of-cores" => \$::opt_number_of_cores,
"use-cpus-instead-of-cores" => \$::opt_use_cpus_instead_of_cores,
"shellquote|shell_quote|shell-quote" => \$::opt_shellquote,
"nice=i" => \$::opt_nice,
"timeout=i" => \$::opt_timeout,
"tag" => \$::opt_tag,
"tagstring=s" => \$::opt_tagstring,
"onall" => \$::opt_onall,
"nonall" => \$::opt_nonall,
"sshlogin|S=s" => \@::opt_sshlogin,
"sshloginfile|slf=s" => \@::opt_sshloginfile,
"controlmaster|M" => \$::opt_controlmaster,
"return=s" => \@::opt_return,
"trc=s" => \@::opt_trc,
"transfer" => \$::opt_transfer,
"cleanup" => \$::opt_cleanup,
"basefile|bf=s" => \@::opt_basefile,
"B=s" => \$::opt_retired,
"workdir|wd=s" => \$::opt_workdir,
"W=s" => \$::opt_retired,
"tmpdir=s" => \$::opt_tmpdir,
"tempdir=s" => \$::opt_tmpdir,
"tty" => \$::opt_tty,
"T" => \$::opt_retired,
"halt-on-error|halt=i" => \$::opt_halt_on_error,
"H=i" => \$::opt_retired,
"retries=i" => \$::opt_retries,
"dry-run|dryrun" => \$::opt_dryrun,
"progress" => \$::opt_progress,
"eta" => \$::opt_eta,
"arg-sep|argsep=s" => \$::opt_arg_sep,
"arg-file-sep|argfilesep=s" => \$::opt_arg_file_sep,
"trim=s" => \$::opt_trim,
"profile|J=s" => \@::opt_profile,
"pipe|spreadstdin" => \$::opt_pipe,
"recstart=s" => \$::opt_recstart,
"recend=s" => \$::opt_recend,
"regexp|regex" => \$::opt_regexp,
"remove-rec-sep|removerecsep|rrs" => \$::opt_remove_rec_sep,
"files|output-as-files|outputasfiles" => \$::opt_files,
"block|block-size|blocksize=s" => \$::opt_blocksize,
"tollef" => \$::opt_tollef,
"gnu" => \$::opt_gnu,
"xapply" => \$::opt_xapply,
"bibtex" => \$::opt_bibtex,
# xargs-compatibility - implemented, man, testsuite
"max-procs|P=s" => \$::opt_P,
"delimiter|d=s" => \$::opt_d,
"max-chars|s=i" => \$::opt_s,
"arg-file|a=s" => \@::opt_a,
"no-run-if-empty|r" => \$::opt_r,
"replace|i:s" => \$::opt_i,
"E=s" => \$::opt_E,
"eof|e:s" => \$::opt_E,
"max-args|n=i" => \$::opt_n,
"max-replace-args|N=i" => \$::opt_N,
"colsep|col-sep|C=s" => \$::opt_colsep,
"help|h" => \$::opt_help,
"L=f" => \$::opt_L,
"max-lines|l:f" => \$::opt_l,
"interactive|p" => \$::opt_p,
"verbose|t" => \$::opt_verbose,
"version|V" => \$::opt_version,
"minversion|min-version=i" => \$::opt_minversion,
"show-limits|showlimits" => \$::opt_show_limits,
"exit|x" => \$::opt_x,
# Semaphore
"semaphore" => \$::opt_semaphore,
"semaphoretimeout=i" => \$::opt_semaphoretimeout,
"semaphorename|id=s" => \$::opt_semaphorename,
"fg" => \$::opt_fg,
"bg" => \$::opt_bg,
"wait" => \$::opt_wait,
# Shebang #!/usr/bin/parallel --shebang
"shebang|hashbang" => \$::opt_shebang,
"Y" => \$::opt_retired,
"skip-first-line" => \$::opt_skip_first_line,
"header=s" => \$::opt_header,
);
}
sub get_options_from_array {
# Run GetOptions on @array
# Returns:
# true if parsing worked
# false if parsing failed
# @array is changed
my $array_ref = shift;
# A bit of shuffling of @ARGV needed as GetOptionsFromArray is not
# supported everywhere
my @save_argv;
my $this_is_ARGV = (\@::ARGV == $array_ref);
if(not $this_is_ARGV) {
@save_argv = @::ARGV;
@::ARGV = @{$array_ref};
}
my @retval = GetOptions(options_hash());
if(not $this_is_ARGV) {
@{$array_ref} = @::ARGV;
@::ARGV = @save_argv;
}
return @retval;
}
sub parse_options {
# Returns: N/A
# Defaults:
$Global::version = 20120322;
$Global::progname = 'parallel';
$Global::infinity = 2**31;
$Global::debug = 0;
$Global::verbose = 0;
$Global::grouped = 1;
$Global::keeporder = 0;
$Global::quoting = 0;
$Global::replace{'{}'} = '{}';
$Global::replace{'{.}'} = '{.}';
$Global::replace{'{/}'} = '{/}';
$Global::replace{'{//}'} = '{//}';
$Global::replace{'{/.}'} = '{/.}';
$Global::replace{'{#}'} = '{#}';
$/="\n";
$Global::ignore_empty = 0;
$Global::interactive = 0;
$Global::stderr_verbose = 0;
$Global::default_simultaneous_sshlogins = 9;
$Global::exitstatus = 0;
$Global::halt_on_error_exitstatus = 0;
$Global::arg_sep = ":::";
$Global::arg_file_sep = "::::";
$Global::trim = 'n';
$Global::max_jobs_running = 0;
$Global::job_already_run = '';
@ARGV=read_options();
if(defined $::opt_retired) {
print STDERR "$Global::progname: -g has been retired. Use --group.\n";
print STDERR "$Global::progname: -B has been retired. Use --bf.\n";
print STDERR "$Global::progname: -T has been retired. Use --tty.\n";
print STDERR "$Global::progname: -U has been retired. Use --er.\n";
print STDERR "$Global::progname: -W has been retired. Use --wd.\n";
print STDERR "$Global::progname: -Y has been retired. Use --shebang.\n";
print STDERR "$Global::progname: -H has been retired. Use --halt.\n";
::wait_and_exit(255);
}
if(defined @::opt_v) { $Global::verbose = $#::opt_v+1; } # Convert -v -v to v=2
$Global::debug = (defined $::opt_D);
if(defined $::opt_X) { $Global::ContextReplace = 1; }
if(defined $::opt_silent) { $Global::verbose = 0; }
if(defined $::opt_k) { $Global::keeporder = 1; }
if(defined $::opt_group) { $Global::grouped = 1; }
if(defined $::opt_u) { $Global::grouped = 0; }
if(defined $::opt_0) { $/ = "\0"; }
if(defined $::opt_d) { my $e="sprintf \"$::opt_d\""; $/ = eval $e; }
if(defined $::opt_p) { $Global::interactive = $::opt_p; }
if(defined $::opt_q) { $Global::quoting = 1; }
if(defined $::opt_r) { $Global::ignore_empty = 1; }
if(defined $::opt_verbose) { $Global::stderr_verbose = 1; }
if(defined $::opt_I) { $Global::replace{'{}'} = $::opt_I; }
if(defined $::opt_U) { $Global::replace{'{.}'} = $::opt_U; }
if(defined $::opt_i) {
$Global::replace{'{}'} = $::opt_i eq "" ? "{}" : $::opt_i;
}
if(defined $::opt_basenamereplace) { $Global::replace{'{/}'} = $::opt_basenamereplace; }
if(defined $::opt_dirnamereplace) { $Global::replace{'{//}'} = $::opt_dirnamereplace; }
if(defined $::opt_basenameextensionreplace) {
$Global::replace{'{/.}'} = $::opt_basenameextensionreplace;
}
if(defined $::opt_seqreplace) {
$Global::replace{'{#}'} = $::opt_seqreplace;
}
if(defined $::opt_E) { $Global::end_of_file_string = $::opt_E; }
if(defined $::opt_n) { $Global::max_number_of_args = $::opt_n; }
if(defined $::opt_timeout) { $Global::timeoutq = TimeoutQueue->new($::opt_timeout); }
if(defined $::opt_tmpdir) { $ENV{'TMPDIR'} = $::opt_tmpdir; }
if(defined $::opt_help) { die_usage(); }
if(defined $::opt_colsep) { $Global::trim = 'lr'; }
if(defined $::opt_header) { $::opt_colsep = defined $::opt_colsep ? $::opt_colsep : "\t"; }
if(defined $::opt_trim) { $Global::trim = $::opt_trim; }
if(defined $::opt_arg_sep) { $Global::arg_sep = $::opt_arg_sep; }
if(defined $::opt_arg_file_sep) { $Global::arg_file_sep = $::opt_arg_file_sep; }
if(defined $::opt_number_of_cpus) { print SSHLogin::no_of_cpus(),"\n"; wait_and_exit(0); }
if(defined $::opt_number_of_cores) {
print SSHLogin::no_of_cores(),"\n"; wait_and_exit(0);
}
if(defined $::opt_max_line_length_allowed) {
print Limits::Command::real_max_length(),"\n"; wait_and_exit(0);
}
if(defined $::opt_version) { version(); wait_and_exit(0); }
if(defined $::opt_bibtex) { bibtex(); wait_and_exit(0); }
if(defined $::opt_show_limits) { show_limits(); }
if(defined @::opt_sshlogin) { @Global::sshlogin = @::opt_sshlogin; }
if(defined @::opt_sshloginfile) { read_sshloginfiles(@::opt_sshloginfile); }
if(defined @::opt_return) { push @Global::ret_files, @::opt_return; }
if(not defined $::opt_recstart and
not defined $::opt_recend) { $::opt_recend = "\n"; }
if(not defined $::opt_blocksize) { $::opt_blocksize = "1M"; }
$::opt_blocksize = multiply_binary_prefix($::opt_blocksize);
if(defined $::opt_semaphore) { $Global::semaphore = 1; }
if(defined $::opt_semaphoretimeout) { $Global::semaphore = 1; }
if(defined $::opt_semaphorename) { $Global::semaphore = 1; }
if(defined $::opt_fg) { $Global::semaphore = 1; }
if(defined $::opt_bg) { $Global::semaphore = 1; }
if(defined $::opt_wait) { $Global::semaphore = 1; }
if(defined $::opt_minversion) {
print $Global::version,"\n";
if($Global::version < $::opt_minversion) {
wait_and_exit(255);
} else {
wait_and_exit(0);
}
}
if(defined $::opt_nonall) {
# Append a dummy empty argument
push @ARGV, ":::", "";
}
if(defined $::opt_tty) {
# Defaults for --tty: -j1 -u
# Can be overridden with -jXXX -g
if(not defined $::opt_P) {
$::opt_P = 1;
}
if(not defined $::opt_group) {
$Global::grouped = 0;
}
}
if(defined @::opt_trc) {
push @Global::ret_files, @::opt_trc;
$::opt_transfer = 1;
$::opt_cleanup = 1;
}
if($::opt_tollef and not $::opt_gnu) {
# Behave like tollef parallel (from moreutils)
if(defined $::opt_l) {
$::opt_load = $::opt_l;
$::opt_l = undef;
}
if(not defined $::opt_arg_sep) {
$Global::arg_sep = "--";
}
}
if(defined $::opt_l) {
if($::opt_l eq "-0") {
# -l -0 (swallowed -0)
$::opt_l = 1;
$::opt_0 = 1;
$/ = "\0";
} elsif ($::opt_l == 0) {
# If not given (or if 0 is given) => 1
$::opt_l = 1;
}
$Global::max_lines = $::opt_l;
$Global::max_number_of_args ||= $Global::max_lines;
}
# Read more than one arg at a time (-L, -N)
if(defined $::opt_L) {
$Global::max_lines = $::opt_L;
$Global::max_number_of_args ||= $Global::max_lines;
}
if(defined $::opt_N) {
$Global::max_number_of_args = $::opt_N;
$Global::ContextReplace = 1;
}
if((defined $::opt_L or defined $::opt_N)
and
not ($::opt_xargs or $::opt_m)) {
$Global::ContextReplace = 1;
}
for (keys %Global::replace) {
$Global::replace{$_} = ::maybe_quote($Global::replace{$_});
}
%Global::replace_rev = reverse %Global::replace;
if(grep /^$Global::arg_sep$|^$Global::arg_file_sep$/o, @ARGV) {
# Deal with ::: and ::::
@ARGV=read_args_from_command_line();
}
# Semaphore defaults
# Must be done before computing number of processes and max_line_length
# because when running as a semaphore GNU Parallel does not read args
$Global::semaphore ||= ($0 =~ m:(^|/)sem$:); # called as 'sem'
if($Global::semaphore) {
# A semaphore does not take input from neither stdin nor file
@::opt_a = ("/dev/null");
push(@Global::unget_argv, [Arg->new("")]);
$Semaphore::timeout = $::opt_semaphoretimeout || 0;
if(defined $::opt_semaphorename) {
$Semaphore::name = $::opt_semaphorename;
} else {
$Semaphore::name = `tty`;
chomp $Semaphore::name;
}
$Semaphore::fg = $::opt_fg;
$Semaphore::wait = $::opt_wait;
$Global::default_simultaneous_sshlogins = 1;
if(not defined $::opt_P) {
$::opt_P = 1;
}
if($Global::interactive and $::opt_bg) {
print STDERR "$Global::progname: Jobs running in the ".
"background cannot be interactive.\n";
::wait_and_exit(255);
}
}
if(defined $::opt_eta) {
$::opt_progress = $::opt_eta;
}
parse_sshlogin();
if(remote_hosts() and ($::opt_X or $::opt_m or $::opt_xargs)) {
# As we do not know the max line length on the remote machine
# long commands generated by xargs may fail
# If opt_N is set, it is probably safe
print STDERR ("$Global::progname: Warning: using -X or -m ",
"with --sshlogin may fail\n");
}
if(not defined $::opt_P) {
$::opt_P = "100%";
}
open_joblog();
}
sub open_joblog {
my $append = 0;
if($::opt_resume and not $::opt_joblog) {
print STDERR ("$Global::progname: --resume requires --joblog\n");
::wait_and_exit(255);
}
if($::opt_joblog) {
if($::opt_resume) {
if(open(JOBLOG, $::opt_joblog)) {
# Read the joblog
$append = <JOBLOG>; # If there is a header: Open as append later
while(<JOBLOG>) {
if(/^(\d+)/) {
# This is 30% faster than set_job_already_run($1);
vec($Global::job_already_run,$1,1) = 1;
} else {
print STDERR ("$Global::progname: Format of '$::opt_joblog' is wrong\n");
::wait_and_exit(255);
}
}
close JOBLOG;
}
}
if($append) {
# Append to joblog
if(not open($Global::joblog,">>$::opt_joblog")) {
print STDERR ("$Global::progname: Cannot append to ",
"--joblog $::opt_joblog\n");
::wait_and_exit(255);
}
} else {
# Overwrite the joblog
if(not open($Global::joblog,">$::opt_joblog")) {
print STDERR ("$Global::progname: Cannot write to ",
"--joblog $::opt_joblog\n");
::wait_and_exit(255);
} else {
print $Global::joblog
join("\t", "Seq", "Host", "Starttime", "Runtime",
"Send", "Receive", "Exitval", "Signal", "Command"
). "\n";
}
}
}
}
sub read_options {
# Read options from command line, profile and $PARALLEL
# Returns:
# @ARGV without --options
# This must be done first as this may exec myself
if(defined $ARGV[0] and ($ARGV[0]=~/^--shebang / or
$ARGV[0]=~/^--hashbang /)) {
# Program is called from #! line in script
$ARGV[0]=~s/^--shebang *//; # remove --shebang if it is set
$ARGV[0]=~s/^--hashbang *//; # remove --hashbang if it is set
my $argfile = pop @ARGV;
# exec myself to split $ARGV[0] into separate fields
exec "$0 --skip-first-line -a $argfile @ARGV";
}
Getopt::Long::Configure("bundling","pass_through");
# Check if there is a --profile to set @::opt_profile
GetOptions("profile|J=s" => \@::opt_profile) || die_usage();
# Add options from .parallel/config and other profiles
my @ARGV_profile = ();
my @ARGV_env = ();
my @config_profiles = (
"/etc/parallel/config",
$ENV{'HOME'}."/.parallel/config",
$ENV{'HOME'}."/.parallelrc");
my @profiles = @config_profiles;
if(@::opt_profile) {
# --profile overrides default profiles
@profiles = ();
for my $profile (@::opt_profile) {
push @profiles, $ENV{'HOME'}."/.parallel/".$profile;
}
}
for my $profile (@profiles) {
if(-r $profile) {
open (IN, "<", $profile) || ::die_bug("read-profile: $profile");
while(<IN>) {
/^\s*\#/ and next;
chomp;
push @ARGV_profile, shell_unquote(split/(?<![\\])\s/, $_);
}
close IN;
} else {
if(grep /^$profile$/, @config_profiles) {
# config file is not required to exist
} else {
print STDERR "$Global::progname: $profile not readable\n";
wait_and_exit(255);
}
}
}
Getopt::Long::Configure("bundling","require_order");
get_options_from_array(\@ARGV_profile) || die_usage();
# Add options from shell variable $PARALLEL
$ENV{'PARALLEL'} and @ARGV_env = shell_unquote(split/(?<![\\])\s/, $ENV{'PARALLEL'});
get_options_from_array(\@ARGV_env) || die_usage();
get_options_from_array(\@ARGV) || die_usage();
# Prepend non-options to @ARGV (such as commands like 'nice')
unshift @ARGV, @ARGV_profile, @ARGV_env;
return @ARGV;
}
sub read_args_from_command_line {
# Arguments given on the command line after:
# ::: ($Global::arg_sep)
# :::: ($Global::arg_file_sep)
# Removes the arguments from @ARGV and:
# - puts filenames into -a
# - puts arguments into files and add the files to -a
# Returns:
# @ARGV without ::: and :::: and following args
# Input: @ARGV = command option ::: arg arg arg :::: argfiles
my @new_argv = ();
for(my $arg = shift @ARGV; @ARGV; $arg = shift @ARGV) {
if($arg eq $Global::arg_sep
or
$arg eq $Global::arg_file_sep) {
my $group = $arg; # This group of arguments is args or argfiles
my @group;
while(defined ($arg = shift @ARGV)) {
if($arg eq $Global::arg_sep
or
$arg eq $Global::arg_file_sep) {
# exit while loop if finding new separator
last;
} else {
# If not hitting ::: or ::::
# Append it to the group
push @group, $arg;
}
}
if($group eq $Global::arg_sep) {
# Group of arguments on the command line.
# Put them into a file.
# Create argfile
my ($outfh,$name) = ::tempfile(SUFFIX => ".arg");
unlink($name);
# Put args into argfile
print $outfh map { $_,$/ } @group;
seek $outfh, 0, 0;
# Append filehandle to -a
push @::opt_a, $outfh;
} elsif($group eq $Global::arg_file_sep) {
# Group of file names on the command line.
# Append args into -a
push @::opt_a, @group;
} else {
::die_bug("Unknown command line group: $group");
}
if(defined($arg)) {
# $arg is ::: or ::::
redo;
} else {
# $arg is undef -> @ARGV empty
last;
}
}
push @new_argv, $arg;
}
# Output: @ARGV = command to run with options
return @new_argv;
}
sub cleanup {
# Returns: N/A
if(@::opt_basefile) { cleanup_basefile(); }
}
sub __QUOTING_ARGUMENTS_FOR_SHELL__ {}
sub shell_quote {
my @strings = (@_);
for my $a (@strings) {
$a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g;
$a =~ s/[\n]/'\n'/g; # filenames with '\n' is quoted using \'
}
return wantarray ? @strings : "@strings";
}
sub shell_quote_scalar {
# Quote the string so shell will not expand any special chars
# Returns:
# string quoted with \ as needed by the shell
my $a = shift;
$a =~ s/([\002-\011\013-\032\\\#\?\`\(\)\{\}\[\]\*\>\<\~\|\; \"\!\$\&\'])/\\$1/g;
$a =~ s/[\n]/'\n'/g; # filenames with '\n' is quoted using \'
return $a;
}
sub maybe_quote {
# If $Global::quoting then quote the string so shell will not expand any special chars
# Else do not quote
# Returns:
# if $Global::quoting string quoted with \ as needed by the shell
# else string unaltered
if($Global::quoting) {
return shell_quote_scalar(@_);
} else {
return "@_";
}
}
sub maybe_unquote {
# If $Global::quoting then unquote the string as shell would
# Else do not unquote
# Returns:
# if $Global::quoting string unquoted as done by the shell
# else string unaltered
if($Global::quoting) {
return shell_unquote(@_);
} else {
return "@_";
}
}
sub shell_unquote {
# Unquote strings from shell_quote
# Returns:
# string with shell quoting removed
my @strings = (@_);
my $arg;
for $arg (@strings) {
if(not defined $arg) {
$arg = "";
}
$arg =~ s/'\n'/\n/g; # filenames with '\n' is quoted using \'
$arg =~ s/\\([\002-\011\013-\032])/$1/g;
$arg =~ s/\\([\#\?\`\(\)\{\}\*\>\<\~\|\; \"\!\$\&\'])/$1/g;
$arg =~ s/\\\\/\\/g;
}
return wantarray ? @strings : "@strings";
}
sub __FILEHANDLES__ {}
sub enough_file_handles {
# check that we have enough filehandles available for starting
# another job
# Returns:
# 1 if ungrouped (thus not needing extra filehandles)
# 0 if too few filehandles
# 1 if enough filehandles
if($Global::grouped) {
my %fh;
my $enough_filehandles = 1;
# We need a filehandle for STDOUT and STDERR
# perl uses 7 filehandles for something?
# open3 uses 2 extra filehandles temporarily
for my $i (1..8) {
$enough_filehandles &&= open($fh{$i},"</dev/null");
}
for (values %fh) { close $_; }
return $enough_filehandles;
} else {
return 1;
}
}
sub open_or_exit {
# Returns:
# file handle to read-opened file
# exits if file cannot be opened
my $file = shift;
if($file eq "-") {
$Global::stdin_in_opt_a = 1;
return ($Global::original_stdin || *STDIN);
}
if(ref $file eq "GLOB") {
# This is an open filehandle
return $file;
}
my $fh = gensym;
if(not open($fh,"<",$file)) {
print STDERR "$Global::progname: ".
"Cannot open input file `$file': ".
"No such file or directory\n";
wait_and_exit(255);
}
return $fh;
}
sub __RUNNING_THE_JOBS_AND_PRINTING_PROGRESS__ {}
# Variable structure:
#
# $Global::running{$pid} = Pointer to Job-object
# $Global::host{$sshlogin} = Pointer to SSHLogin-object
# $Global::total_running = total number of running jobs
# $Global::total_started = total jobs started
sub init_run_jobs {
# Remember the original STDOUT and STDERR
# Returns: N/A
open $Global::original_stdout, ">&STDOUT" or
::die_bug("Can't dup STDOUT: $!");
open $Global::original_stderr, ">&STDERR" or
::die_bug("Can't dup STDERR: $!");
open $Global::original_stdin, "<&STDIN" or
::die_bug("Can't dup STDIN: $!");
$Global::total_running = 0;
$Global::total_started = 0;
$Global::tty_taken = 0;
$SIG{USR1} = \&list_running_jobs;
$SIG{USR2} = \&toggle_progress;
if(@::opt_basefile) { setup_basefile(); }
}
sub start_more_jobs {
# Returns:
# number of jobs started
my $jobs_started = 0;
if(not $Global::start_no_new_jobs) {
if($Global::max_procs_file) {
my $mtime = (stat($Global::max_procs_file))[9];
if($mtime > $Global::max_procs_file_last_mod) {
$Global::max_procs_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_jobs_running(undef);
}
}
}
if($Global::max_load_file) {
my $mtime = (stat($Global::max_load_file))[9];
if($mtime > $Global::max_load_file_last_mod) {
$Global::max_load_file_last_mod = $mtime;
for my $sshlogin (values %Global::host) {
$sshlogin->set_max_loadavg(undef);
}
}
}
for my $sshlogin (values %Global::host) {
debug("Running jobs before on ".$sshlogin->string().": ".$sshlogin->jobs_running()."\n");
if($::opt_load and $sshlogin->loadavg_too_high()) {
# The load is too high or unknown
next;
}
if($::opt_noswap and $sshlogin->swapping()) {
# The server is swapping
next;
}
while ($sshlogin->jobs_running() < $sshlogin->max_jobs_running()) {
if($Global::JobQueue->empty() and not $::opt_pipe) {
last;
}
debug($sshlogin->string()." has ".$sshlogin->jobs_running()
. " out of " . $sshlogin->max_jobs_running()
. " jobs running. Start another.\n");
if(start_another_job($sshlogin) == 0) {
# No more jobs to start on this $sshlogin
debug("No jobs started on ".$sshlogin->string()."\n");
last;
}
debug("Job started on ".$sshlogin->string()."\n");
$sshlogin->inc_jobs_running();
$jobs_started++;
}
debug("Running jobs after on ".$sshlogin->string().": ".$sshlogin->jobs_running()
." of ".$sshlogin->max_jobs_running() ."\n");
}
}
return $jobs_started;
}
sub start_another_job {
# Grab a job from Global::JobQueue, start it at sshlogin
# and remember the pid, the STDOUT and the STDERR handles
# Returns:
# 1 if another jobs was started
# 0 otherwise
my $sshlogin = shift;
# Do we have enough file handles to start another job?
if(enough_file_handles()) {
if($Global::JobQueue->empty() and not $::opt_pipe) {
# No more commands to run
debug("Not starting: JobQueue empty\n");
return 0;
} else {
my $job;
do {
$job = get_job_with_sshlogin($sshlogin);
if(not defined $job) {
# No command available for that sshlogin
debug("Not starting: no jobs available for ".$sshlogin->string()."\n");
return 0;
}
} while ($job->is_already_in_joblog());
debug("Command to run on '".$job->sshlogin()."': '".$job->replaced()."'\n");
if($job->start()) {
$Global::running{$job->pid()} = $job;
debug("Started as seq ".$job->seq(),"\n");
return 1;
} else {
# If interactive says: Dont run the job, then skip it and run the next
return start_another_job($sshlogin);
}
}
} else {
# No more file handles
debug("Not starting: no more file handles\n");
return 0;
}
}
sub drain_job_queue {
# Returns: N/A
$Private::first_completed ||= time;
if($::opt_progress) {
do_not_reap();
print $Global::original_stderr init_progress();
reap_if_needed();
}
my $last_header="";
my $sleep = 0.2;
do {
while($Global::total_running > 0) {
debug("jobs running: ", $Global::total_running, "==", scalar
keys %Global::running," slots: ", $Global::max_jobs_running,
" Memory usage:".my_memory_usage()." ");
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
usleep($sleep);
do_not_reap();
if($::opt_pipe) {
# When using --pipe sometimes file handles are not closed properly
for my $job (values %Global::running) {
my $fh = $job->stdin();
close $fh;
}
}
if($::opt_progress) {
my %progress = progress();
if($last_header ne $progress{'header'}) {
print $Global::original_stderr "\n",$progress{'header'},"\n";
$last_header = $progress{'header'};
}
print $Global::original_stderr "\r",$progress{'status'};
}
# Sometimes SIGCHLD is not registered, so force reaper
if(reaper()) {
# Child finished this time around: Reset sleep time
$sleep = 0.2;
}
reap_if_needed();
}
if(not $Global::JobQueue->empty()) {
start_more_jobs(); # These jobs may not be started because of loadavg
$sleep = ($sleep < 1000) ? ($sleep * 1.1) : ($sleep);
usleep($sleep);
}
} while ($Global::total_running > 0
or
not $Global::start_no_new_jobs and not $Global::JobQueue->empty());
if($::opt_progress) {
print $Global::original_stderr "\n";
}
}
sub toggle_progress {
# Turn on/off progress view
# Returns: N/A
$::opt_progress = not $::opt_progress;
if($::opt_progress) {
print $Global::original_stderr init_progress();
}
}
sub init_progress {
# Returns:
# list of computers for progress output
$|=1;
my %progress = progress();
return ("\nComputers / CPU cores / Max jobs to run\n",
$progress{'workerlist'});
}
sub progress {
# Returns:
# list of workers
# header that will fit on the screen
# status message that will fit on the screen
my $termcols = terminal_columns();
my ($status, $header) = ("x"x($termcols+1),"");
my @workers = sort keys %Global::host;
my %sshlogin = map { $_ eq ":" ? ($_=>"local") : ($_=>$_) } @workers;
my $workerno = 1;
my %workerno = map { ($_=>$workerno++) } @workers;
my $workerlist = "";
for my $w (@workers) {
$workerlist .=
$workerno{$w}.":".$sshlogin{$w} ." / ".
($Global::host{$w}->ncpus() || "-")." / ".
$Global::host{$w}->max_jobs_running()."\n";
}
my $eta = "";
if($::opt_eta) {
my $completed = 0;
for(@workers) { $completed += $Global::host{$_}->jobs_completed() }
if($completed) {
my $total = $Global::JobQueue->total_jobs();
my $left = $total - $completed;
my $pctcomplete = $completed / $total;
my $timepassed = (time - $Private::first_completed);
my $avgtime = $timepassed / $completed;
$Private::smoothed_avg_time ||= $avgtime;
# Smooth the eta so it does not jump wildly
$Private::smoothed_avg_time = (1 - $pctcomplete) *
$Private::smoothed_avg_time + $pctcomplete * $avgtime;
my $this_eta;
$Private::last_time ||= $timepassed;
if($timepassed != $Private::last_time
or not defined $Private::last_eta) {
$Private::last_time = $timepassed;
$this_eta = $left * $Private::smoothed_avg_time;
$Private::last_eta = $this_eta;
} else {
$this_eta = $Private::last_eta;
}
$eta = sprintf("ETA: %ds %dleft %.2favg ", $this_eta, $left, $avgtime);
}
}
if(length $status > $termcols) {
# sshlogin1:XX/XX/XX%/XX.Xs sshlogin2:XX/XX/XX%/XX.Xs sshlogin3:XX/XX/XX%/XX.Xs
$header = "Computer:jobs running/jobs completed/%of started jobs/Average seconds to complete";
$status = $eta .
join(" ",map
{
if($Global::total_started) {
my $completed = ($Global::host{$_}->jobs_completed()||0);
my $running = $Global::host{$_}->jobs_running();
my $time = $completed ? (time-$^T)/($completed) : "0";
sprintf("%s:%d/%d/%d%%/%.1fs ",
$sshlogin{$_}, $running, $completed,
($running+$completed)*100
/ $Global::total_started, $time);
}
} @workers);
}
if(length $status > $termcols) {
# 1:XX/XX/XX%/XX.Xs 2:XX/XX/XX%/XX.Xs 3:XX/XX/XX%/XX.Xs 4:XX/XX/XX%/XX.Xs
$header = "Computer:jobs running/jobs completed/%of started jobs";
$status = $eta .
join(" ",map
{
my $completed = ($Global::host{$_}->jobs_completed()||0);
my $running = $Global::host{$_}->jobs_running();
my $time = $completed ? (time-$^T)/($completed) : "0";
sprintf("%s:%d/%d/%d%%/%.1fs ",
$workerno{$_}, $running, $completed,
($running+$completed)*100
/ $Global::total_started, $time);
} @workers);
}
if(length $status > $termcols) {
# sshlogin1:XX/XX/XX% sshlogin2:XX/XX/XX% sshlogin3:XX/XX/XX%
$header = "Computer:jobs running/jobs completed/%of started jobs";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d/%d%%",
$sshlogin{$_},
$Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0),
($Global::host{$_}->jobs_running()+
($Global::host{$_}->jobs_completed()||0))*100
/ $Global::total_started) }
@workers);
}
if(length $status > $termcols) {
# 1:XX/XX/XX% 2:XX/XX/XX% 3:XX/XX/XX% 4:XX/XX/XX% 5:XX/XX/XX% 6:XX/XX/XX%
$header = "Computer:jobs running/jobs completed/%of started jobs";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d/%d%%",
$workerno{$_},
$Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0),
($Global::host{$_}->jobs_running()+
($Global::host{$_}->jobs_completed()||0))*100
/ $Global::total_started) }
@workers);
}
if(length $status > $termcols) {
# sshlogin1:XX/XX/XX% sshlogin2:XX/XX/XX% sshlogin3:XX/XX sshlogin4:XX/XX
$header = "Computer:jobs running/jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d",
$sshlogin{$_}, $Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
if(length $status > $termcols) {
# sshlogin1:XX/XX sshlogin2:XX/XX sshlogin3:XX/XX sshlogin4:XX/XX
$header = "Computer:jobs running/jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d",
$sshlogin{$_}, $Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
if(length $status > $termcols) {
# 1:XX/XX 2:XX/XX 3:XX/XX 4:XX/XX 5:XX/XX 6:XX/XX
$header = "Computer:jobs running/jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d/%d",
$workerno{$_}, $Global::host{$_}->jobs_running(),
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
if(length $status > $termcols) {
# sshlogin1:XX sshlogin2:XX sshlogin3:XX sshlogin4:XX sshlogin5:XX
$header = "Computer:jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d",
$sshlogin{$_},
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
if(length $status > $termcols) {
# 1:XX 2:XX 3:XX 4:XX 5:XX 6:XX
$header = "Computer:jobs completed";
$status = $eta .
join(" ",map
{ sprintf("%s:%d",
$workerno{$_},
($Global::host{$_}->jobs_completed()||0)) }
@workers);
}
return ("workerlist" => $workerlist, "header" => $header, "status" => $status);
}
sub terminal_columns {
# Get the number of columns of the display
# Returns:
# number of columns of the screen
if(not $Private::columns) {
$Private::columns = $ENV{'COLUMNS'};
if(not $Private::columns) {
my $resize = qx{ resize 2>/dev/null };
$resize =~ /COLUMNS=(\d+);/ and do { $Private::columns = $1; };
}
$Private::columns ||= 80;
}
return $Private::columns;
}
sub get_job_with_sshlogin {
# Returns:
# next command to run with ssh command wrapping if remote
# next command to run with no wrapping (clean_command)
my $sshlogin = shift;
if($::oodebug and $Global::JobQueue->empty()) {
Carp::confess("get_job_with_sshlogin should never be called if empty");
}
my $job = $Global::JobQueue->get();
if(not defined $job) {
# No more jobs
::debug("No more jobs: JobQueue empty\n");
return undef;
}
if($::oodebug and not defined $job->{'commandline'}) {
Carp::confess("get_job_with_sshlogin job->commandline should never be empty");
}
my $clean_command = $job->replaced();
if($clean_command =~ /^\s*$/) {
# Do not run empty lines
if(not $Global::JobQueue->empty()) {
return get_job_with_sshlogin($sshlogin);
} else {
return undef;
}
}
$job->set_sshlogin($sshlogin);
if($::opt_retries and $clean_command and
$job->failed_here()) {
# This command with these args failed for this sshlogin
my ($no_of_failed_sshlogins,$min_failures) = $job->min_failed();
#::my_dump(($no_of_failed_sshlogins,$min_failures));
if($no_of_failed_sshlogins == keys %Global::host and
$job->failed_here() == $min_failures) {
# It failed the same or more times on another host:
# run it on this host
} else {
# If it failed fewer times on another host:
# Find another job to run
my $nextjob;
if(not $Global::JobQueue->empty()) {
# This can potentially recurse for all args
no warnings 'recursion';
$nextjob = get_job_with_sshlogin($sshlogin);
}
# Push the command back on the queue
$Global::JobQueue->unget($job);
return $nextjob;
}
}
return $job;
}
sub __REMOTE_SSH__ {}
sub read_sshloginfiles {
# Returns: N/A
for (@_) {
read_sshloginfile($_);
}
}
sub read_sshloginfile {
# Returns: N/A
my $file = shift;
my $close = 1;
if($file eq "..") {
$file = $ENV{'HOME'}."/.parallel/sshloginfile";
}
if($file eq ".") {
$file = "/etc/parallel/sshloginfile";
}
if($file eq "-") {
*IN = *STDIN;
$close = 0;
} else {
if(not open(IN, $file)) {
print $Global::original_stderr "Cannot open $file\n";
exit(255);
}
}
while(<IN>) {
chomp;
/^\s*#/ and next;
/^\s*$/ and next;
push @Global::sshlogin, $_;
}
if($close) {
close IN;
}
}
sub parse_sshlogin {
# Returns: N/A
my @login;
if(not @Global::sshlogin) { @Global::sshlogin = (":"); }
for my $sshlogin (@Global::sshlogin) {
# Split up -S sshlogin,sshlogin
for my $s (split /,/, $sshlogin) {
if ($s eq ".." or $s eq "-") {
read_sshloginfile($s);
} else {
push (@login, $s);
}
}
}
for my $sshlogin_string (@login) {
my $sshlogin = SSHLogin->new($sshlogin_string);
$sshlogin->set_maxlength(Limits::Command::max_length());
$Global::host{$sshlogin->string()} = $sshlogin;
}
#debug("sshlogin: ", my_dump(%Global::host),"\n");
if($::opt_transfer or @::opt_return or $::opt_cleanup or @::opt_basefile) {
if(not remote_hosts()) {
# There are no remote hosts
if(defined @::opt_trc) {
print $Global::original_stderr
"parallel: Warning: --trc ignored as there are no remote --sshlogin\n";
} elsif (defined $::opt_transfer) {
print $Global::original_stderr
"parallel: Warning: --transfer ignored as there are no remote --sshlogin\n";
} elsif (defined @::opt_return) {
print $Global::original_stderr
"parallel: Warning: --return ignored as there are no remote --sshlogin\n";
} elsif (defined $::opt_cleanup) {
print $Global::original_stderr
"parallel: Warning: --cleanup ignored as there are no remote --sshlogin\n";
} elsif (defined @::opt_basefile) {
print $Global::original_stderr
"parallel: Warning: --basefile ignored as there are no remote --sshlogin\n";
}
}
}
}
sub remote_hosts {
# Return sshlogins that are not ':'
# Returns:
# list of sshlogins with ':' removed
return grep !/^:$/, keys %Global::host;
}
sub setup_basefile {
# Transfer basefiles to each $sshlogin
# This needs to be done before first jobs on $sshlogin is run
# Returns: N/A
my $cmd = "";
for my $sshlogin (values %Global::host) {
if($sshlogin->string() eq ":") { next }
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $rsync_opt = "-rlDzR -e".shell_quote_scalar($sshcmd);
for my $file (@::opt_basefile) {
my $f = $file;
my $relpath = ($f !~ m:^/:); # Is the path relative?
# Use different subdirs depending on abs or rel path
my $rsync_destdir = ($relpath ? "./" : "/");
$f =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that
$f = shell_quote_scalar($f);
$cmd .= "rsync $rsync_opt $f $serverlogin:$rsync_destdir &";
}
}
$cmd .= "wait;";
debug("basesetup: $cmd\n");
print `$cmd`;
}
sub cleanup_basefile {
# Remove the basefiles transferred
# Returns: N/A
my $cmd="";
for my $sshlogin (values %Global::host) {
if($sshlogin->string() eq ":") { next }
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
for my $file (@::opt_basefile) {
$cmd .= "$sshcmd $serverlogin rm -f ".shell_quote_scalar(shell_quote_scalar($file))."&";
}
}
$cmd .= "wait;";
debug("basecleanup: $cmd\n");
print `$cmd`;
}
sub __SIGNAL_HANDLING__ {}
sub list_running_jobs {
# Returns: N/A
for my $v (values %Global::running) {
print $Global::original_stderr "$Global::progname: ",$v->replaced(),"\n";
}
}
sub start_no_new_jobs {
# Returns: N/A
$SIG{TERM} = $Global::original_sig{TERM};
print $Global::original_stderr
("$Global::progname: SIGTERM received. No new jobs will be started.\n",
"$Global::progname: Waiting for these ", scalar(keys %Global::running),
" jobs to finish. Send SIGTERM again to stop now.\n");
list_running_jobs();
$Global::start_no_new_jobs++;
}
sub count_sig_child {
# Returns: N/A
$Global::sig_child_caught++;
}
sub do_not_reap {
# This will postpone SIGCHILD for sections that cannot be distracted by a dying child
# (Racecondition)
# Returns: N/A
$SIG{CHLD} = \&count_sig_child;
}
sub reap_if_needed {
# Do the postponed SIGCHILDs if any and re-install normal reaper for SIGCHILD
# (Racecondition)
# Returns: N/A
if($Global::sig_child_caught) {
$Global::sig_child_caught = 0;
reaper();
}
$SIG{CHLD} = \&reaper;
}
sub reaper {
# A job finished.
# Print the output.
# Start another job
# Returns: N/A
do_not_reap();
$Private::reaperlevel++;
my $stiff;
my $children_reaped = 0;
debug("Reaper called $Private::reaperlevel ");
while (($stiff = waitpid(-1, &WNOHANG)) > 0) {
$children_reaped++;
if($Global::sshmaster{$stiff}) {
# This is one of the ssh -M: ignore
next;
}
# Ignore processes that we did not start
my $job = $Global::running{$stiff};
$job or next;
$job->set_exitstatus($? >> 8);
$job->set_exitsignal($? & 127);
debug("died (".$job->exitstatus()."): ".$job->seq());
$job->set_endtime();
if($stiff == $Global::tty_taken) {
# The process that died had the tty => release it
$Global::tty_taken = 0;
}
if(not $job->should_be_retried()) {
# Force printing now if the job failed and we are going to exit
my $print_now = ($job->exitstatus() and
$::opt_halt_on_error and $::opt_halt_on_error == 2);
if($Global::keeporder and not $print_now) {
$Private::print_later{$job->seq()} = $job;
$Private::job_end_sequence ||= 1;
debug("Looking for: $Private::job_end_sequence ".
"Current: ".$job->seq()."\n");
while($Private::print_later{$Private::job_end_sequence}) {
debug("Found job end $Private::job_end_sequence");
$Private::print_later{$Private::job_end_sequence}->print();
delete $Private::print_later{$Private::job_end_sequence};
$Private::job_end_sequence++;
}
} else {
$job->print();
}
if($job->exitstatus()) {
# The jobs had a exit status <> 0, so error
$Global::exitstatus++;
if($::opt_halt_on_error) {
if($::opt_halt_on_error == 1) {
# If halt on error == 1 we should gracefully exit
print $Global::original_stderr
("$Global::progname: Starting no more jobs. ",
"Waiting for ", scalar(keys %Global::running),
" jobs to finish. This job failed:\n",
$job->replaced(),"\n");
$Global::start_no_new_jobs++;
$Global::halt_on_error_exitstatus = $job->exitstatus();
} elsif($::opt_halt_on_error == 2) {
# If halt on error == 2 we should exit immediately
print $Global::original_stderr
("$Global::progname: This job failed:\n",
$job->replaced(),"\n");
exit ($job->exitstatus());
}
}
}
}
my $sshlogin = $job->sshlogin();
$sshlogin->dec_jobs_running();
$sshlogin->inc_jobs_completed();
$Global::total_running--;
delete $Global::running{$stiff};
start_more_jobs();
}
reap_if_needed();
debug("Reaper exit $Private::reaperlevel\n");
$Private::reaperlevel--;
return $children_reaped;
}
sub timeout {
# SIGALRM was received. Check if there was a timeout
# @Global::timeout is sorted by timeout
while (@Global::timeouts) {
my $t = $Global::timeouts[0];
if($t->timed_out()) {
$t->kill();
shift @Global::timeouts;
} else {
# Because they are sorted by timeout
last;
}
}
}
sub __USAGE__ {}
sub wait_and_exit {
# If we do not wait, we sometimes get segfault
# Returns: N/A
for (keys %Global::unkilled_children) {
kill 9, $_;
waitpid($_,0);
delete $Global::unkilled_children{$_};
}
wait();
exit(shift);
}
sub die_usage {
# Returns: N/A
usage();
wait_and_exit(255);
}
sub usage {
# Returns: N/A
print join
("\n",
"Usage:",
"$Global::progname [options] [command [arguments]] < list_of_arguments",
"$Global::progname [options] [command [arguments]] (::: arguments|:::: argfile(s))...",
"cat ... | $Global::progname --pipe [options] [command [arguments]]",
"",
"-j n Run n jobs in parallel",
"-k Keep same order",
"-X Multiple arguments with context replace",
"--colsep regexp Split input on regexp for positional replacements",
"{} {.} {/} {/.} {#} Replacement strings",
"{3} {3.} {3/} {3/.} Positional replacement strings",
"",
"-S sshlogin Example: foo\@server.example.com",
"--slf .. Use ~/.parallel/sshloginfile as the list of sshlogins",
"--trc {}.bar Shorthand for --transfer --return {}.bar --cleanup",
"--onall Run the given command with argument on all sshlogins",
"--nonall Run the given command with no arguments on all sshlogins",
"",
"--pipe Split stdin (standard input) to multiple jobs.",
"--recend str Record end separator for --pipe.",
"--recstart str Record start separator for --pipe.",
"",
"See 'man $Global::progname' for details",
"",
"When using GNU Parallel for a publication please cite:",
"",
"O. Tange (2011): GNU Parallel - The Command-Line Power Tool,",
";login: The USENIX Magazine, February 2011:42-47.",
"");
}
sub die_bug {
my $bugid = shift;
print STDERR
("$Global::progname: This should not happen. You have found a bug.\n",
"Please contact <parallel\@gnu.org> and include:\n",
"* The version number: $Global::version\n",
"* The bugid: $bugid\n",
"* The command line being run\n",
"* The files being read (put the files on a webserver if they are big)\n",
"\n",
"If you get the error on smaller/fewer files, please include those instead.\n");
::wait_and_exit(255);
}
sub version {
# Returns: N/A
if($::opt_tollef and not $::opt_gnu) {
print "WARNING: YOU ARE USING --tollef. USE --gnu FOR GNU PARALLEL\n\n";
}
print join("\n",
"GNU $Global::progname $Global::version",
"Copyright (C) 2007,2008,2009,2010,2011,2012 Ole Tange and Free Software Foundation, Inc.",
"License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>",
"This is free software: you are free to change and redistribute it.",
"GNU $Global::progname comes with no warranty.",
"",
"Web site: http://www.gnu.org/software/${Global::progname}\n",
"When using GNU Parallel for a publication please cite:\n",
"O. Tange (2011): GNU Parallel - The Command-Line Power Tool, ",
";login: The USENIX Magazine, February 2011:42-47.\n",
);
}
sub bibtex {
# Returns: N/A
if($::opt_tollef and not $::opt_gnu) {
print "WARNING: YOU ARE USING --tollef. USE --gnu FOR GNU PARALLEL\n\n";
}
print join("\n",
"\@article{Tange2011a,",
" title = {GNU Parallel - The Command-Line Power Tool},",
" author = {O. Tange},",
" address = {Frederiksberg, Denmark},",
" journal = {;login: The USENIX Magazine},",
" month = {Feb},",
" number = {1},",
" volume = {36},",
" url = {http://www.gnu.org/s/parallel},",
" year = {2011},",
" pages = {42-47}",
"}",
"",
);
}
sub show_limits {
# Returns: N/A
print("Maximal size of command: ",Limits::Command::real_max_length(),"\n",
"Maximal used size of command: ",Limits::Command::max_length(),"\n",
"\n",
"Execution of will continue now, and it will try to read its input\n",
"and run commands; if this is not what you wanted to happen, please\n",
"press CTRL-D or CTRL-C\n");
}
sub __GENERIC_COMMON_FUNCTION__ {}
sub min {
# Returns:
# Minimum value of array
my $min;
for (@_) {
# Skip undefs
defined $_ or next;
defined $min or do { $min = $_; next; }; # Set $_ to the first non-undef
$min = ($min < $_) ? $min : $_;
}
return $min;
}
sub max {
# Returns:
# Maximum value of array
my $max;
for (@_) {
# Skip undefs
defined $_ or next;
defined $max or do { $max = $_; next; }; # Set $_ to the first non-undef
$max = ($max > $_) ? $max : $_;
}
return $max;
}
sub sum {
# Returns:
# Sum of values of array
my @args = @_;
my $sum = 0;
for (@args) {
# Skip undefs
$_ and do { $sum += $_; }
}
return $sum;
}
sub undef_as_zero {
my $a = shift;
return $a ? $a : 0;
}
sub undef_as_empty {
my $a = shift;
return $a ? $a : "";
}
sub hostname {
if(not $Private::hostname) {
my $hostname = `hostname`;
chomp($hostname);
$Private::hostname = $hostname || "nohostname";
}
return $Private::hostname;
}
sub usleep {
# Sleep this many milliseconds.
my $secs = shift;
::debug("Sleeping ",$secs," millisecs\n");
select(undef, undef, undef, $secs/1000);
if($::opt_timeout) {
::debug(my_dump($Global::timeoutq));
$Global::timeoutq->process_timeouts();
}
}
sub multiply_binary_prefix {
# Evalualte numbers with binary prefix
# k=10^3, m=10^6, g=10^9, t=10^12, p=10^15, e=10^18, z=10^21, y=10^24
# K=2^10, M=2^20, G=2^30, T=2^40, P=2^50, E=2^70, Z=2^80, Y=2^80
# Ki=2^10, Mi=2^20, Gi=2^30, Ti=2^40, Pi=2^50, Ei=2^70, Zi=2^80, Yi=2^80
# ki=2^10, mi=2^20, gi=2^30, ti=2^40, pi=2^50, ei=2^70, zi=2^80, yi=2^80
# 13G = 13*1024*1024*1024 = 13958643712
my $s = shift;
$s =~ s/k/*1000/g;
$s =~ s/M/*1000*1000/g;
$s =~ s/G/*1000*1000*1000/g;
$s =~ s/T/*1000*1000*1000*1000/g;
$s =~ s/P/*1000*1000*1000*1000*1000/g;
$s =~ s/E/*1000*1000*1000*1000*1000*1000/g;
$s =~ s/Z/*1000*1000*1000*1000*1000*1000*1000/g;
$s =~ s/Y/*1000*1000*1000*1000*1000*1000*1000*1000/g;
$s =~ s/X/*1000*1000*1000*1000*1000*1000*1000*1000*1000/g;
$s =~ s/Ki?/*1024/gi;
$s =~ s/Mi?/*1024*1024/gi;
$s =~ s/Gi?/*1024*1024*1024/gi;
$s =~ s/Ti?/*1024*1024*1024*1024/gi;
$s =~ s/Pi?/*1024*1024*1024*1024*1024/gi;
$s =~ s/Ei?/*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Zi?/*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Yi?/*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s =~ s/Xi?/*1024*1024*1024*1024*1024*1024*1024*1024*1024/gi;
$s = eval $s;
return $s;
}
sub __DEBUGGING__ {}
sub debug {
# Returns: N/A
$Global::debug or return;
@_ = grep { defined $_ ? $_ : "" } @_;
if($Global::original_stdout) {
print $Global::original_stdout @_;
} else {
print @_;
}
}
sub my_memory_usage {
# Returns:
# memory usage if found
# 0 otherwise
use strict;
use FileHandle;
my $pid = $$;
if(-e "/proc/$pid/stat") {
my $fh = FileHandle->new("</proc/$pid/stat");
my $data = <$fh>;
chomp $data;
$fh->close;
my @procinfo = split(/\s+/,$data);
return undef_as_zero($procinfo[22]);
} else {
return 0;
}
}
sub my_size {
# Returns:
# size of object if Devel::Size is installed
# -1 otherwise
my @size_this = (@_);
eval "use Devel::Size qw(size total_size)";
if ($@) {
return -1;
} else {
return total_size(@_);
}
}
sub my_dump {
# Returns:
# ascii expression of object if Data::Dump(er) is installed
# error code otherwise
my @dump_this = (@_);
eval "use Data::Dump qw(dump);";
if ($@) {
# Data::Dump not installed
eval "use Data::Dumper;";
if ($@) {
my $err = "Neither Data::Dump nor Data::Dumper is installed\n".
"Not dumping output\n";
print $Global::original_stderr $err;
return $err;
} else {
return Dumper(@dump_this);
}
} else {
# Create a dummy Data::Dump:dump as Hans Schou sometimes has
# it undefined
eval "sub Data::Dump:dump {}";
eval "use Data::Dump qw(dump);";
return (Data::Dump::dump(@dump_this));
}
}
sub __OBJECT_ORIENTED_PARTS__ {}
package SSHLogin;
sub new {
my $class = shift;
my $sshlogin_string = shift;
my $ncpus;
if($sshlogin_string =~ s:^(\d*)/:: and $1) {
# Override default autodetected ncpus unless zero or missing
$ncpus = $1;
}
my $string = $sshlogin_string;
my @unget = ();
return bless {
'string' => $string,
'jobs_running' => 0,
'jobs_completed' => 0,
'maxlength' => undef,
'max_jobs_running' => undef,
'ncpus' => $ncpus,
'sshcommand' => undef,
'serverlogin' => undef,
'control_path_dir' => undef,
'control_path' => undef,
'loadavg_file' => $ENV{'HOME'} . "/.parallel/tmp/loadavg-" .
$$."-".$string,
'loadavg' => undef,
'last_loadavg_update' => 0,
'swap_activity_file' => $ENV{'HOME'} . "/.parallel/tmp/swap_activity-" .
$$."-".$string,
'swap_activity' => undef,
}, ref($class) || $class;
}
sub DESTROY {
my $self = shift;
# Remove temporary files if they are created.
unlink $self->{'loadavg_file'};
unlink $self->{'swap_activity_file'};
}
sub string {
my $self = shift;
return $self->{'string'};
}
sub jobs_running {
my $self = shift;
return ($self->{'jobs_running'} || "0");
}
sub inc_jobs_running {
my $self = shift;
$self->{'jobs_running'}++;
}
sub dec_jobs_running {
my $self = shift;
$self->{'jobs_running'}--;
}
#sub set_jobs_running {
# my $self = shift;
# $self->{'jobs_running'} = shift;
#}
sub set_maxlength {
my $self = shift;
$self->{'maxlength'} = shift;
}
sub maxlength {
my $self = shift;
return $self->{'maxlength'};
}
sub jobs_completed {
my $self = shift;
return $self->{'jobs_completed'};
}
sub inc_jobs_completed {
my $self = shift;
$self->{'jobs_completed'}++;
}
sub set_max_jobs_running {
my $self = shift;
if(defined $self->{'max_jobs_running'}) {
$Global::max_jobs_running -= $self->{'max_jobs_running'};
}
$self->{'max_jobs_running'} = shift;
if(defined $self->{'max_jobs_running'}) {
# max_jobs_running could be resat if -j is a changed file
$Global::max_jobs_running += $self->{'max_jobs_running'};
}
}
sub swapping {
my $self = shift;
my $swapping = $self->swap_activity();
return (not defined $swapping or $swapping)
}
sub swap_activity {
# If the currently known swap activity is too old:
# Recompute a new one in the background
# Returns:
# last swap activity computed
my $self = shift;
# Should we update the swap_activity file?
my $update_swap_activity_file = 0;
if(-r $self->{'swap_activity_file'}) {
open(SWAP,"<".$self->{'swap_activity_file'}) || ::die_bug("swap_activity_file-r");
my $swap_out = <SWAP>;
close SWAP;
if($swap_out =~ /^(\d+)$/) {
$self->{'swap_activity'} = $1;
::debug("New swap_activity: ".$self->{'swap_activity'});
}
::debug("Last update: ".$self->{'last_swap_activity_update'});
if(time - $self->{'last_swap_activity_update'} > 10) {
# last swap activity update was started 10 seconds ago
::debug("Older than 10 sec: ".$self->{'swap_activity_file'});
$update_swap_activity_file = 1;
}
} else {
::debug("No swap_activity file: ".$self->{'swap_activity_file'});
$self->{'swap_activity'} = undef;
$update_swap_activity_file = 1;
}
if($update_swap_activity_file) {
::debug("Updating swap_activity file".$self->{'swap_activity_file'});
$self->{'last_swap_activity_update'} = time;
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
my $swap_activity;
$swap_activity = ("vmstat 1 2 2>/dev/null | tail -n1 | awk '{print \$7*\$8}' || ".
# If the (remote) machine is Mac and the above fails, try this:
"vm_stat 1 | head -n 3 | tail -n1 | awk '{print \$9*\$10}'");
if($self->{'string'} ne ":") {
$swap_activity = $self->sshcommand() . " " . $self->serverlogin() . " " .
::shell_quote_scalar($swap_activity);
}
# Run swap_activity measuring.
# As the command can take long to run if run remote
# save it to a tmp file before moving it to the correct file
my $file = $self->{'swap_activity_file'};
my $tmpfile = $self->{'swap_activity_file'}.$$;
qx{ ($swap_activity > $tmpfile; mv $tmpfile $file) & };
}
return $self->{'swap_activity'};
}
sub loadavg_too_high {
my $self = shift;
my $loadavg = $self->loadavg();
return (not defined $loadavg or
$loadavg > $self->max_loadavg());
}
sub loadavg {
# If the currently know loadavg is too old:
# Recompute a new one in the background
# Returns:
# last load average computed
my $self = shift;
# Should we update the loadavg file?
my $update_loadavg_file = 0;
if(-r $self->{'loadavg_file'}) {
open(UPTIME,"<".$self->{'loadavg_file'}) || ::die_bug("loadavg_file-r");
my $uptime_out = <UPTIME>;
close UPTIME;
# load average: 0.76, 1.53, 1.45
if($uptime_out =~ /load average: (\d+.\d+)/) {
$self->{'loadavg'} = $1;
::debug("New loadavg: ".$self->{'loadavg'});
} else {
::die_bug("loadavg_invalid_content: $uptime_out");
}
::debug("Last update: ".$self->{'last_loadavg_update'});
if(time - $self->{'last_loadavg_update'} > 10) {
# last loadavg was started 10 seconds ago
::debug("Older than 10 sec: ".$self->{'loadavg_file'});
$update_loadavg_file = 1;
}
} else {
::debug("No loadavg file: ".$self->{'loadavg_file'});
$self->{'loadavg'} = undef;
$update_loadavg_file = 1;
}
if($update_loadavg_file) {
::debug("Updating loadavg file".$self->{'loadavg_file'});
$self->{'last_loadavg_update'} = time;
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
my $uptime;
if($self->{'string'} eq ":") {
$uptime = "uptime";
} else {
$uptime = $self->sshcommand() . " " . $self->serverlogin() . " uptime";
}
# Run uptime.
# As the command can take long to run if run remote
# save it to a tmp file before moving it to the correct file
my $file = $self->{'loadavg_file'};
my $tmpfile = $self->{'loadavg_file'}.$$;
qx{ ($uptime > $tmpfile && mv $tmpfile $file) & };
}
return $self->{'loadavg'};
}
sub max_loadavg {
my $self = shift;
if(not defined $self->{'max_loadavg'}) {
$self->{'max_loadavg'} =
$self->compute_max_loadavg($::opt_load);
}
::debug("max_loadavg: ".$self->string()." ".$self->{'max_loadavg'});
return $self->{'max_loadavg'};
}
sub set_max_loadavg {
my $self = shift;
$self->{'max_loadavg'} = shift;
}
sub compute_max_loadavg {
# Parse the max loadaverage that the user asked for using --load
# Returns:
# max loadaverage
my $self = shift;
my $loadspec = shift;
my $load;
if(defined $loadspec) {
if($loadspec =~ /^\+(\d+)$/) {
# E.g. --load +2
my $j = $1;
$load =
$self->ncpus() + $j;
} elsif ($loadspec =~ /^-(\d+)$/) {
# E.g. --load -2
my $j = $1;
$load =
$self->ncpus() - $j;
} elsif ($loadspec =~ /^(\d+)\%$/) {
my $j = $1;
$load =
$self->ncpus() * $j / 100;
} elsif ($loadspec =~ /^(\d+(\.\d+)?)$/) {
$load = $1;
} elsif (-f $loadspec) {
$Global::max_load_file = $loadspec;
$Global::max_load_file_last_mod = (stat($Global::max_load_file))[9];
if(open(IN, $Global::max_load_file)) {
my $opt_load_file = join("",<IN>);
close IN;
$load = $self->compute_max_loadavg($opt_load_file);
} else {
print $Global::original_stderr "Cannot open $loadspec\n";
exit(255);
}
} else {
print $Global::original_stderr "Parsing of --load failed\n";
::die_usage();
}
if($load < 0.01) {
$load = 0.01;
}
}
return $load;
}
sub max_jobs_running {
my $self = shift;
if(not defined $self->{'max_jobs_running'}) {
$self->set_max_jobs_running($self->compute_number_of_processes($::opt_P));
}
return $self->{'max_jobs_running'};
}
sub compute_number_of_processes {
# Number of processes wanted and limited by system resources
# Returns:
# Number of processes
my $self = shift;
my $opt_P = shift;
my $wanted_processes = $self->user_requested_processes($opt_P);
if(not defined $wanted_processes) {
$wanted_processes = $Global::default_simultaneous_sshlogins;
}
::debug("Wanted procs: $wanted_processes\n");
my $system_limit =
$self->processes_available_by_system_limit($wanted_processes);
$system_limit < 1 and ::die_bug('$system_limit < 1');
::debug("Limited to procs: $system_limit\n");
return $system_limit;
}
sub processes_available_by_system_limit {
# If the wanted number of processes is bigger than the system limits:
# Limit them to the system limits
# Limits are: File handles, number of input lines, processes,
# and taking > 1 second to spawn 10 extra processes
# Returns:
# Number of processes
my $self = shift;
my $wanted_processes = shift;
my $system_limit = 0;
my @jobs = ();
my $job;
my @args = ();
my $arg;
my $more_filehandles = 1;
my $max_system_proc_reached = 0;
my $slow_spawining_warning_printed = 0;
my $time = time;
my %fh;
my @children;
::do_not_reap();
# Reserve filehandles
# perl uses 7 filehandles for something?
# parallel uses 1 for memory_usage
for my $i (1..8) {
open($fh{"init-$i"},"</dev/null");
}
my $count_jobs_already_read = $Global::JobQueue->next_seq();
my $wait_time_for_getting_args = 0;
my $start_time = time;
while(1) {
$system_limit >= $wanted_processes and last;
not $more_filehandles and last;
$max_system_proc_reached and last;
my $before_getting_arg = time;
if($Global::semaphore) {
# Skip
} elsif(defined $::opt_retries and $count_jobs_already_read) {
# For retries we may need to run all jobs on this sshlogin
# so include the already read jobs for this sshlogin
$count_jobs_already_read--;
} else {
if($::opt_X or $::opt_m) {
# The arguments may have to be re-spread over several jobslots
# So pessimistically only read one arg per jobslot
# instead of a full commandline
if($Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->empty()) {
if($Global::JobQueue->empty()) {
last;
} else {
($job) = $Global::JobQueue->get();
push(@jobs, $job);
}
} else {
($arg) = $Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->get();
push(@args, $arg);
}
} else {
# If there are no more command lines, then we have a process
# per command line, so no need to go further
$Global::JobQueue->empty() and last;
($job) = $Global::JobQueue->get();
push(@jobs, $job);
}
}
$wait_time_for_getting_args += time - $before_getting_arg;
$system_limit++;
# Every simultaneous process uses 2 filehandles when grouping
$more_filehandles = open($fh{$system_limit*10},"</dev/null")
&& open($fh{$system_limit*10+2},"</dev/null");
# System process limit
my $child;
if($child = fork()) {
push (@children,$child);
$Global::unkilled_children{$child} = 1;
} elsif(defined $child) {
# The child takes one process slot
# It will be killed later
$SIG{TERM} = $Global::original_sig{TERM};
sleep 10000000;
exit(0);
} else {
$max_system_proc_reached = 1;
}
my $forktime = time - $time - $wait_time_for_getting_args;
::debug("Time to fork $system_limit procs: $wait_time_for_getting_args ",
$forktime,
" (processes so far: ", $system_limit,")\n");
if($system_limit > 10 and
$forktime > 1 and
$forktime > $system_limit * 0.01
and not $slow_spawining_warning_printed) {
# It took more than 0.01 second to fork a processes on avg.
# Give the user a warning. He can press Ctrl-C if this
# sucks.
print $Global::original_stderr
("parallel: Warning: Starting $system_limit processes took > $forktime sec.\n",
"Consider adjusting -j. Press CTRL-C to stop.\n");
$slow_spawining_warning_printed = 1;
}
}
if($system_limit < $wanted_processes and not $more_filehandles) {
print $Global::original_stderr
("parallel: Warning: Only enough filehandles to run ",
$system_limit, " jobs in parallel. ",
"Raising ulimit -n may help.\n");
}
if($system_limit < $wanted_processes and $max_system_proc_reached) {
print $Global::original_stderr
("parallel: Warning: Only enough available processes to run ",
$system_limit, " jobs in parallel.\n");
}
if($Global::JobQueue->empty()) {
$system_limit ||= 1;
}
# Cleanup: Close the files
for (values %fh) { close $_ }
# Cleanup: Kill the children
for my $pid (@children) {
kill 9, $pid;
waitpid($pid,0);
delete $Global::unkilled_children{$pid};
}
#wait();
# Cleanup: Unget the command_lines or the @args
$Global::JobQueue->{'commandlinequeue'}->{'arg_queue'}->unget(@args);
$Global::JobQueue->unget(@jobs);
if($self->string() ne ":" and
$system_limit > $Global::default_simultaneous_sshlogins) {
$system_limit =
$self->simultaneous_sshlogin_limit($system_limit);
}
return $system_limit;
}
sub simultaneous_sshlogin_limit {
# Test by logging in wanted number of times simultaneously
# Returns:
# min($wanted_processes,$working_simultaneous_ssh_logins-1)
my $self = shift;
my $wanted_processes = shift;
# Try twice because it guesses wrong sometimes
# Choose the minimal
my $ssh_limit =
::min($self->simultaneous_sshlogin($wanted_processes),
$self->simultaneous_sshlogin($wanted_processes));
if($ssh_limit < $wanted_processes) {
my $serverlogin = $self->serverlogin();
print $Global::original_stderr
("parallel: Warning: ssh to $serverlogin only allows ",
"for $ssh_limit simultaneous logins.\n",
"You may raise this by changing ",
"/etc/ssh/sshd_config:MaxStartup on $serverlogin\n",
"Using only ",$ssh_limit-1," connections ",
"to avoid race conditions\n");
}
# Race condition can cause problem if using all sshs.
if($ssh_limit > 1) { $ssh_limit -= 1; }
return $ssh_limit;
}
sub simultaneous_sshlogin {
# Using $sshlogin try to see if we can do $wanted_processes
# simultaneous logins
# (ssh host echo simultaneouslogin & ssh host echo simultaneouslogin & ...)|grep simul|wc -l
# Returns:
# Number of succesful logins
my $self = shift;
my $wanted_processes = shift;
my $sshcmd = $self->sshcommand();
my $serverlogin = $self->serverlogin();
my $cmd = "$sshcmd $serverlogin echo simultaneouslogin 2>&1 &"x$wanted_processes;
::debug("Trying $wanted_processes logins at $serverlogin");
open (SIMUL, "($cmd)|grep simultaneouslogin | wc -l|") or
::die_bug("simultaneouslogin");
my $ssh_limit = <SIMUL>;
close SIMUL;
chomp $ssh_limit;
return $ssh_limit;
}
sub set_ncpus {
my $self = shift;
$self->{'ncpus'} = shift;
}
sub user_requested_processes {
# Parse the number of processes that the user asked for using -j
# Returns:
# the number of processes to run on this sshlogin
my $self = shift;
my $opt_P = shift;
my $processes;
if(defined $opt_P) {
if($opt_P =~ /^\+(\d+)$/) {
# E.g. -P +2
my $j = $1;
$processes =
$self->ncpus() + $j;
} elsif ($opt_P =~ /^-(\d+)$/) {
# E.g. -P -2
my $j = $1;
$processes =
$self->ncpus() - $j;
} elsif ($opt_P =~ /^(\d+)\%$/) {
my $j = $1;
$processes =
$self->ncpus() * $j / 100;
} elsif ($opt_P =~ /^(\d+)$/) {
$processes = $1;
if($processes == 0) {
# -P 0 = infinity (or at least close)
$processes = $Global::infinity;
}
} elsif (-f $opt_P) {
$Global::max_procs_file = $opt_P;
$Global::max_procs_file_last_mod = (stat($Global::max_procs_file))[9];
if(open(IN, $Global::max_procs_file)) {
my $opt_P_file = join("",<IN>);
close IN;
$processes = $self->user_requested_processes($opt_P_file);
} else {
print $Global::original_stderr "Cannot open $opt_P\n";
exit(255);
}
} else {
print $Global::original_stderr "Parsing of --jobs/-j/--max-procs/-P failed\n";
::die_usage();
}
if($processes < 1) {
$processes = 1;
}
}
return $processes;
}
sub ncpus {
my $self = shift;
if(not defined $self->{'ncpus'}) {
my $sshcmd = $self->sshcommand();
my $serverlogin = $self->serverlogin();
if($serverlogin eq ":") {
if($::opt_use_cpus_instead_of_cores) {
$self->{'ncpus'} = no_of_cpus();
} else {
$self->{'ncpus'} = no_of_cores();
}
} else {
my $ncpu;
if($::opt_use_cpus_instead_of_cores) {
$ncpu = qx(echo|$sshcmd $serverlogin parallel --number-of-cpus);
} else {
$ncpu = qx(echo|$sshcmd $serverlogin parallel --number-of-cores);
}
chomp $ncpu;
if($ncpu =~ /^\s*[0-9]+\s*$/s) {
$self->{'ncpus'} = $ncpu;
} else {
print $Global::original_stderr
("parallel: Warning: Could not figure out ",
"number of cpus on $serverlogin ($ncpu). Using 1\n");
$self->{'ncpus'} = 1;
}
}
}
return $self->{'ncpus'};
}
sub no_of_cpus {
# Returns:
# Number of physical CPUs
local $/="\n"; # If delimiter is set, then $/ will be wrong
my $no_of_cpus;
if ($^O eq 'linux') {
$no_of_cpus = no_of_cpus_gnu_linux();
} elsif ($^O eq 'freebsd') {
$no_of_cpus = no_of_cpus_freebsd();
} elsif ($^O eq 'solaris') {
$no_of_cpus = no_of_cpus_solaris();
} elsif ($^O eq 'aix') {
$no_of_cpus = no_of_cpus_aix();
} elsif ($^O eq 'darwin') {
$no_of_cpus = no_of_cpus_darwin();
} else {
$no_of_cpus = (no_of_cpus_freebsd()
|| no_of_cpus_darwin()
|| no_of_cpus_solaris()
|| no_of_cpus_aix()
|| no_of_cpus_gnu_linux()
);
}
if($no_of_cpus) {
chomp $no_of_cpus;
return $no_of_cpus;
} else {
warn("parallel: Cannot figure out number of cpus. Using 1");
return 1;
}
}
sub no_of_cores {
# Returns:
# Number of CPU cores
local $/="\n"; # If delimiter is set, then $/ will be wrong
my $no_of_cores;
if ($^O eq 'linux') {
$no_of_cores = no_of_cores_gnu_linux();
} elsif ($^O eq 'freebsd') {
$no_of_cores = no_of_cores_freebsd();
} elsif ($^O eq 'solaris') {
$no_of_cores = no_of_cores_solaris();
} elsif ($^O eq 'aix') {
$no_of_cores = no_of_cores_aix();
} elsif ($^O eq 'darwin') {
$no_of_cores = no_of_cores_darwin();
} else {
$no_of_cores = (no_of_cores_freebsd()
|| no_of_cores_darwin()
|| no_of_cores_solaris()
|| no_of_cores_aix()
|| no_of_cores_gnu_linux()
);
}
if($no_of_cores) {
chomp $no_of_cores;
return $no_of_cores;
} else {
warn("parallel: Cannot figure out number of CPU cores. Using 1");
return 1;
}
}
sub no_of_cpus_gnu_linux {
# Returns:
# Number of physical CPUs on GNU/Linux
# undef if not GNU/Linux
my $no_of_cpus;
if(-e "/proc/cpuinfo") {
$no_of_cpus = 0;
my %seen;
open(IN,"cat /proc/cpuinfo|") || return undef;
while(<IN>) {
if(/^physical id.*[:](.*)/ and not $seen{$1}++) {
$no_of_cpus++;
}
}
close IN;
}
return $no_of_cpus;
}
sub no_of_cores_gnu_linux {
# Returns:
# Number of CPU cores on GNU/Linux
# undef if not GNU/Linux
my $no_of_cores;
if(-e "/proc/cpuinfo") {
$no_of_cores = 0;
open(IN,"cat /proc/cpuinfo|") || return undef;
while(<IN>) {
/^processor.*[:]/ and $no_of_cores++;
}
close IN;
}
return $no_of_cores;
}
sub no_of_cpus_darwin {
# Returns:
# Number of physical CPUs on Mac Darwin
# undef if not Mac Darwin
my $no_of_cpus =
(`sysctl -n hw.physicalcpu 2>/dev/null`
or
`sysctl -a hw 2>/dev/null | grep -w physicalcpu | awk '{ print \$2 }'`);
return $no_of_cpus;
}
sub no_of_cores_darwin {
# Returns:
# Number of CPU cores on Mac Darwin
# undef if not Mac Darwin
my $no_of_cores =
(`sysctl -n hw.logicalcpu 2>/dev/null`
or
`sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`);
return $no_of_cores;
}
sub no_of_cpus_freebsd {
# Returns:
# Number of physical CPUs on FreeBSD
# undef if not FreeBSD
my $no_of_cpus =
(`sysctl -a dev.cpu 2>/dev/null | grep \%parent | awk '{ print \$2 }' | uniq | wc -l | awk '{ print \$1 }'`
or
`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`);
chomp $no_of_cpus;
return $no_of_cpus;
}
sub no_of_cores_freebsd {
# Returns:
# Number of CPU cores on FreeBSD
# undef if not FreeBSD
my $no_of_cores =
(`sysctl hw.ncpu 2>/dev/null | awk '{ print \$2 }'`
or
`sysctl -a hw 2>/dev/null | grep -w logicalcpu | awk '{ print \$2 }'`);
chomp $no_of_cores;
return $no_of_cores;
}
sub no_of_cpus_solaris {
# Returns:
# Number of physical CPUs on Solaris
# undef if not Solaris
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
if(-x "/usr/sbin/prtconf") {
my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
if($#prtconf >= 0) {
return $#prtconf +1;
}
}
return undef;
}
sub no_of_cores_solaris {
# Returns:
# Number of CPU cores on Solaris
# undef if not Solaris
if(-x "/usr/sbin/psrinfo") {
my @psrinfo = `/usr/sbin/psrinfo`;
if($#psrinfo >= 0) {
return $#psrinfo +1;
}
}
if(-x "/usr/sbin/prtconf") {
my @prtconf = `/usr/sbin/prtconf | grep cpu..instance`;
if($#prtconf >= 0) {
return $#prtconf +1;
}
}
return undef;
}
sub no_of_cpus_aix {
# Returns:
# Number of physical CPUs on AIX
# undef if not AIX
my $no_of_cpus = 0;
if(-x "/usr/sbin/lscfg") {
open(IN,"/usr/sbin/lscfg -vs |grep proc | wc -l|tr -d ' ' |")
|| return undef;
$no_of_cpus = <IN>;
chomp ($no_of_cpus);
close IN;
}
return $no_of_cpus;
}
sub no_of_cores_aix {
# Returns:
# Number of CPU cores on AIX
# undef if not AIX
my $no_of_cores;
if(-x "/usr/bin/vmstat") {
open(IN,"/usr/bin/vmstat 1 1|") || return undef;
while(<IN>) {
/lcpu=([0-9]*) / and $no_of_cores = $1;
}
close IN;
}
return $no_of_cores;
}
sub sshcommand {
my $self = shift;
if (not defined $self->{'sshcommand'}) {
$self->sshcommand_of_sshlogin();
}
return $self->{'sshcommand'};
}
sub serverlogin {
my $self = shift;
if (not defined $self->{'serverlogin'}) {
$self->sshcommand_of_sshlogin();
}
return $self->{'serverlogin'};
}
sub sshcommand_of_sshlogin {
# 'server' -> ('ssh -S /tmp/parallel-ssh-RANDOM/host-','server')
# 'user@server' -> ('ssh','user@server')
# 'myssh user@server' -> ('myssh','user@server')
# 'myssh -l user server' -> ('myssh -l user','server')
# '/usr/bin/myssh -l user server' -> ('/usr/bin/myssh -l user','server')
# Returns:
# sshcommand - defaults to 'ssh'
# login@host
my $self = shift;
my ($sshcmd, $serverlogin);
if($self->{'string'} =~ /(.+) (\S+)$/) {
# Own ssh command
$sshcmd = $1; $serverlogin = $2;
} else {
# Normal ssh
if($::opt_controlmaster) {
# Use control_path to make ssh faster
my $control_path = $self->control_path_dir()."/ssh-%r@%h:%p";
$sshcmd = "ssh -S ".$control_path;
$serverlogin = $self->{'string'};
my $master = "ssh -MTS $control_path $serverlogin sleep 1";
if(not $self->{'control_path'}{$control_path}++) {
# Master is not running for this control_path
# Start it
my $pid = fork();
if($pid) {
$Global::sshmaster{$pid}++;
} else {
::debug($master,"\n");
`$master`;
::wait_and_exit(0);
}
}
} else {
$sshcmd = "ssh"; $serverlogin = $self->{'string'};
}
}
$self->{'sshcommand'} = $sshcmd;
$self->{'serverlogin'} = $serverlogin;
}
sub control_path_dir {
# Returns:
# path to directory
my $self = shift;
if(not defined $self->{'control_path_dir'}) {
-e $ENV{'HOME'}."/.parallel" or mkdir $ENV{'HOME'}."/.parallel";
-e $ENV{'HOME'}."/.parallel/tmp" or mkdir $ENV{'HOME'}."/.parallel/tmp";
$self->{'control_path_dir'} =
File::Temp::tempdir($ENV{'HOME'}
. "/.parallel/tmp/control_path_dir-XXXX",
CLEANUP => 1);
}
return $self->{'control_path_dir'};
}
package JobQueue;
sub new {
my $class = shift;
my $command = shift;
my $read_from = shift;
my $context_replace = shift;
my $max_number_of_args = shift;
my $return_files = shift;
my $commandlinequeue = CommandLineQueue->new(
$command,$read_from,$context_replace,$max_number_of_args,$return_files);
my @unget = ();
return bless {
'unget' => \@unget,
'commandlinequeue' => $commandlinequeue,
'total_jobs' => undef,
}, ref($class) || $class;
}
sub get {
my $self = shift;
if(@{$self->{'unget'}}) {
my $job = shift @{$self->{'unget'}};
return ($job);
} else {
my $commandline = $self->{'commandlinequeue'}->get();
if(defined $commandline) {
my $job = Job->new($commandline);
return $job;
} else {
return undef;
}
}
}
sub unget {
my $self = shift;
unshift @{$self->{'unget'}}, @_;
}
sub empty {
my $self = shift;
my $empty = (not @{$self->{'unget'}})
&& $self->{'commandlinequeue'}->empty();
::debug("JobQueue->empty $empty\n");
return $empty;
}
sub total_jobs {
my $self = shift;
if(not defined $self->{'total_jobs'}) {
my $job;
my @queue;
while($job = $self->get()) {
push @queue, $job;
}
$self->unget(@queue);
$self->{'total_jobs'} = $#queue+1;
}
return $self->{'total_jobs'};
}
sub next_seq {
my $self = shift;
return $self->{'commandlinequeue'}->seq();
}
sub quote_args {
my $self = shift;
return $self->{'commandlinequeue'}->quote_args();
}
package Job;
sub new {
my $class = shift;
my $commandline = shift;
return bless {
'commandline' => $commandline, # The commandline with no args
'workdir' => undef, # --workdir
'stdin' => undef, # filehandle for stdin (used for --pipe)
'stdout' => undef, # filehandle for stdout (used for --group)
# filename for writing stdout to (used for --files)
'stdoutfilename' => undef,
'stderr' => undef, # filehandle for stderr (used for --group)
'remaining' => "", # remaining data not sent to stdin (used for --pipe)
'datawritten' => 0, # amount of data sent via stdin (used for --pipe)
'transfersize' => 0, # size of files using --transfer
'returnsize' => 0, # size of files using --return
'pid' => undef,
# hash of { SSHLogins => number of times the command failed there }
'failed' => undef,
'sshlogin' => undef,
# The commandline wrapped with rsync and ssh
'sshlogin_wrap' => undef,
'exitstatus' => undef,
'exitsignal' => undef,
# Timestamp for timeout if any
'timeout' => undef,
}, ref($class) || $class;
}
sub replaced {
my $self = shift;
$self->{'commandline'} or Carp::croak("cmdline empty");
return $self->{'commandline'}->replaced();
}
sub seq {
my $self = shift;
return $self->{'commandline'}->seq();
}
sub set_stdout {
my $self = shift;
$self->{'stdout'} = shift;
}
sub stdout {
my $self = shift;
return $self->{'stdout'};
}
sub set_stdoutfilename {
my $self = shift;
$self->{'stdoutfilename'} = shift;
}
sub stdoutfilename {
my $self = shift;
return $self->{'stdoutfilename'};
}
sub stderr {
my $self = shift;
return $self->{'stderr'};
}
sub set_stderr {
my $self = shift;
$self->{'stderr'} = shift;
}
sub stdin {
my $self = shift;
return $self->{'stdin'};
}
sub set_stdin {
my $self = shift;
my $stdin = shift;
# set non-blocking
fcntl($stdin, ::F_SETFL, ::O_NONBLOCK) or
::die_bug("Couldn't set flags for HANDLE: $!");
$self->{'stdin'} = $stdin;
}
sub write {
my $self = shift;
my $remaining_ref = shift;
if(length($$remaining_ref)) {
$self->{'remaining'} .= $$remaining_ref;
$self->complete_write();
}
}
sub complete_write {
# Returns:
# number of bytes written (see syswrite)
my $self = shift;
my $in = $self->{'stdin'};
my $len = syswrite($in,$self->{'remaining'});
if (!defined($len) && $! == &::EAGAIN) {
# write would block;
} else {
# Remove the part that was written
substr($self->{'remaining'},0,$len) = "";
$self->{'datawritten'} += $len;
}
return $len;
}
sub remaining {
my $self = shift;
if(defined $self->{'remaining'}) {
return length $self->{'remaining'};
} else {
return undef;
}
}
sub datawritten {
my $self = shift;
return $self->{'datawritten'};
}
sub pid {
my $self = shift;
return $self->{'pid'};
}
sub set_pid {
my $self = shift;
$self->{'pid'} = shift;
}
sub starttime {
my $self = shift;
return $self->{'starttime'};
}
sub set_starttime {
my $self = shift;
my $starttime = shift || time;
$self->{'starttime'} = $starttime;
}
sub runtime {
my $self = shift;
return $self->{'endtime'}-$self->{'starttime'};
}
sub endtime {
my $self = shift;
return $self->{'endtime'};
}
sub set_endtime {
my $self = shift;
my $endtime = shift || time;
$self->{'endtime'} = $endtime;
}
sub set_timeout {
my $self = shift;
my $delta_time = shift;
$self->{'timeout'} = time + $delta_time;
}
sub timeout {
my $self = shift;
return $self->{'timeout'};
}
sub timedout {
my $self = shift;
return time > $self->{'timeout'};
}
sub kill {
# kill the jobs
my $self = shift;
my @family_pids = $self->family_pids();
# Record this jobs as failed
$self->set_exitstatus(1);
# Send two TERMs to give time to clean up
for my $signal ("TERM", "TERM", "KILL") {
my $alive = 0;
for my $pid (@family_pids) {
if(kill 0, $pid) {
# The job still running
kill $signal, $pid;
$alive = 1;
}
}
# Wait 200 ms between TERMs - but only if any pids are alive
if($signal eq "TERM" and $alive) { ::usleep(200); }
}
}
sub family_pids {
# Find the pids with this->pid as (grand)*parent
# TODO test this on different OS as 'ps' is known to be different
my $self = shift;
my $pid = $self->pid();
my $script = q{
family_pids() {
for CHILDPID in `ps --ppid "$@" -o pid --no-headers`; do
family_pids $CHILDPID &
done
echo "$@"
}
} .
"family_pids $pid; wait";
my @pids = qx{$script};
chomp(@pids);
return ($pid,@pids);
}
sub failed {
# return number of times failed for this $sshlogin
my $self = shift;
my $sshlogin = shift;
return $self->{'failed'}{$sshlogin};
}
sub failed_here {
# return number of times failed for the current $sshlogin
my $self = shift;
return $self->{'failed'}{$self->sshlogin()};
}
sub add_failed {
# increase the number of times failed for this $sshlogin
my $self = shift;
my $sshlogin = shift;
$self->{'failed'}{$sshlogin}++;
}
sub add_failed_here {
# increase the number of times failed for the current $sshlogin
my $self = shift;
$self->{'failed'}{$self->sshlogin()}++;
}
sub reset_failed {
# increase the number of times failed for this $sshlogin
my $self = shift;
my $sshlogin = shift;
delete $self->{'failed'}{$sshlogin};
}
sub reset_failed_here {
# increase the number of times failed for this $sshlogin
my $self = shift;
delete $self->{'failed'}{$self->sshlogin()};
}
sub min_failed {
# Returns:
# the number of sshlogins this command has failed on
# the minimal number of times this command has failed
my $self = shift;
my $min_failures =
::min(map { $self->{'failed'}{$_} }
keys %{$self->{'failed'}});
my $number_of_sshlogins_failed_on = scalar keys %{$self->{'failed'}};
return ($number_of_sshlogins_failed_on,$min_failures);
}
sub total_failed {
# Returns:
# the number of times this command has failed
my $self = shift;
my $total_failures = 0;
for (values %{$self->{'failed'}}) {
$total_failures += $_;
}
return ($total_failures);
}
sub set_sshlogin {
my $self = shift;
my $sshlogin = shift;
$self->{'sshlogin'} = $sshlogin;
delete $self->{'sshlogin_wrap'}; # If sshlogin is changed the wrap is wrong
}
sub sshlogin {
my $self = shift;
return $self->{'sshlogin'};
}
sub sshlogin_wrap {
# Wrap the command with the commands needed to run remotely
my $self = shift;
if(not defined $self->{'sshlogin_wrap'}) {
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $next_command_line = $self->replaced();
my ($pre,$post,$cleanup)=("","","");
if($serverlogin eq ":") {
$self->{'sshlogin_wrap'} = $next_command_line;
} else {
# --transfer
$pre .= $self->sshtransfer();
# --return
$post .= $self->sshreturn();
# --cleanup
$post .= $self->sshcleanup();
if($post) {
# We need to save the exit status of the job
$post = '_EXIT_status=$?; ' . $post . ' exit $_EXIT_status;';
}
# If the remote login shell is (t)csh then use 'setenv'
# otherwise use 'export'
my $parallel_env =
q{'eval `echo $SHELL | grep -E "/(t)?csh" > /dev/null}
. q{ && echo setenv PARALLEL_SEQ '$PARALLEL_SEQ'\;}
. q{ setenv PARALLEL_PID '$PARALLEL_PID'}
. q{ || echo PARALLEL_SEQ='$PARALLEL_SEQ'\;export PARALLEL_SEQ\;}
. q{PARALLEL_PID='$PARALLEL_PID'\;export PARALLEL_PID` ;'};
if($::opt_workdir) {
$self->{'sshlogin_wrap'} =
($pre . "$sshcmd $serverlogin $parallel_env "
. ::shell_quote_scalar("cd ".$self->workdir()." && ")
. ::shell_quote_scalar($next_command_line).";".$post);
} else {
$self->{'sshlogin_wrap'} =
($pre . "$sshcmd $serverlogin $parallel_env "
. ::shell_quote_scalar($next_command_line).";".$post);
}
}
}
return $self->{'sshlogin_wrap'};
}
sub transfer {
# Files to transfer
my $self = shift;
my @transfer = ();
$self->{'transfersize'} = 0;
if($::opt_transfer) {
for my $record (@{$self->{'commandline'}{'arg_list'}}) {
# Merge arguments from records into args
for my $arg (@$record) {
CORE::push @transfer, $arg->orig();
# filesize
if(-e $arg->orig()) {
$self->{'transfersize'} += (stat($arg->orig()))[7];
}
}
}
}
return @transfer;
}
sub transfersize {
my $self = shift;
return $self->{'transfersize'};
}
sub sshtransfer {
my $self = shift;
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $rsync_opt = "-rlDzR -e".::shell_quote_scalar($sshcmd);
my $pre = "";
for my $file ($self->transfer()) {
$file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that
$file =~ s:^\./::g; # Remove ./ if any
my $relpath = ($file !~ m:^/:); # Is the path relative?
# Use different subdirs depending on abs or rel path
# Abs path: rsync -rlDzR /home/tange/dir/subdir/file.gz server:/
# Rel path: rsync -rlDzR ./subdir/file.gz server:.parallel/tmp/tempid/
# Rel path: rsync -rlDzR ./subdir/file.gz server:$workdir/
my $remote_workdir = $self->workdir($file);
my $rsync_destdir = ($relpath ? $remote_workdir : "/");
if($relpath) {
$file = "./".$file;
}
if(-r $file) {
my $mkremote_workdir =
$remote_workdir eq "." ? "true" :
"ssh $serverlogin mkdir -p $rsync_destdir";
$pre .= "$mkremote_workdir; rsync $rsync_opt "
. ::shell_quote_scalar($file)." $serverlogin:$rsync_destdir;";
} else {
print $Global::original_stderr
"parallel: Warning: "
. $file . " is not readable and will not be transferred\n";
}
}
return $pre;
}
sub return {
# Files to return
# Quoted and with {...} substituted
my $self = shift;
my @return = ();
for my $return (@{$self->{'commandline'}{'return_files'}}) {
CORE::push @return,
$self->{'commandline'}->replace_placeholders($return,1);
}
return @return;
}
sub returnsize {
# This is called after the job has finished
my $self = shift;
for my $file ($self->return()) {
if(-e $file) {
$self->{'returnsize'} += (stat($file))[7];
}
}
return $self->{'returnsize'};
}
sub sshreturn {
my $self = shift;
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $rsync_opt = "-rlDzR -e".::shell_quote_scalar($sshcmd);
my $pre = "";
for my $file ($self->return()) {
$file =~ s:/\./:/:g; # Rsync treats /./ special. We dont want that
$file =~ s:^\./::g; # Remove ./ if any
my $relpath = ($file !~ m:^/:); # Is the path relative?
# Use different subdirs depending on abs or rel path
# Return or cleanup
my @cmd = ();
my $rsync_destdir = ($relpath ? "./" : "/");
my $ret_file = $file;
my $remove = $::opt_cleanup ? "--remove-source-files" : "";
# If relative path: prepend workdir/./ to avoid problems
# if the dir contains ':' and to get the right relative return path
my $replaced = ($relpath ? $self->workdir()."/./" : "") . $file;
# --return
# Abs path: rsync -rlDzR server:/home/tange/dir/subdir/file.gz /
# Rel path: rsync -rlDzR server:./subsir/file.gz ./
$pre .= "rsync $rsync_opt $remove $serverlogin:".
::shell_quote_scalar($replaced) . " ".$rsync_destdir.";";
}
return $pre;
}
sub sshcleanup {
# Return the sshcommand needed to remove the file
# Returns:
# ssh command needed to remove files from sshlogin
my $self = shift;
my $sshlogin = $self->sshlogin();
my $sshcmd = $sshlogin->sshcommand();
my $serverlogin = $sshlogin->serverlogin();
my $workdir = $self->workdir();
my $removeworkdir = "";
my $cleancmd = "";
for my $file ($self->cleanup()) {
my @subworkdirs = parentdirs_of($file);
$file = ::shell_quote_scalar($file);
if(@subworkdirs) {
$removeworkdir = "; rmdir 2>/dev/null ".
join(" ",map { ::shell_quote_scalar($workdir."/".$_) }
@subworkdirs);
}
my $relpath = ($file !~ m:^/:); # Is the path relative?
my $cleandir = ($relpath ? $workdir."/" : "");
$cleancmd .= "$sshcmd $serverlogin rm -f "
. ::shell_quote_scalar($cleandir.$file.$removeworkdir).";";
}
return $cleancmd;
}
sub cleanup {
# Returns:
# Files to remove at cleanup
my $self = shift;
if($::opt_cleanup) {
my @transfer = $self->transfer();
return @transfer;
} else {
return ();
}
}
sub workdir {
# Returns:
# the workdir on a remote machine
my $self = shift;
if(not defined $self->{'workdir'}) {
my $workdir;
if(defined $::opt_workdir) {
if($::opt_workdir eq ".") {
# . means current dir
my $home = $ENV{'HOME'};
eval 'use Cwd';
my $cwd = cwd();
$::opt_workdir = $cwd;
if($home) {
# If homedir exists: remove the homedir from
# workdir if cwd starts with homedir
# E.g. /home/foo/my/dir => my/dir
# E.g. /tmp/my/dir => /tmp/my/dir
my ($home_dev, $home_ino) = (stat($home))[0,1];
my $parent = "";
my @dir_parts = split(m:/:,$cwd);
my $part;
while(defined ($part = shift @dir_parts)) {
$part eq "" and next;
$parent .= "/".$part;
my ($parent_dev, $parent_ino) = (stat($parent))[0,1];
if($parent_dev == $home_dev and $parent_ino == $home_ino) {
# dev and ino is the same: We found the homedir.
$::opt_workdir = join("/",@dir_parts);
last;
}
}
}
} elsif($::opt_workdir eq "...") {
$workdir = ".parallel/tmp/" . ::hostname() . "-" . $$
. "-" . $self->seq();
} else {
$workdir = $::opt_workdir;
# Rsync treats /./ special. We dont want that