Skip to content

Commit

Permalink
sys/cmd/mapr: fix severe data loss bug when processing >4MB of data
Browse files Browse the repository at this point in the history
The way mapr divided very large input into roughly 4MB chunks, to
be processed by separate awk invocations, failed to account for
stream buffering on Unix-like systems. As a result, data was lost
in between chunks. Each time an awk invocation exited at the end of
a chunk, data from the standard input stream was lost as it was
already buffered for the awk process that exited. This made mapr
fundamentally unusable for large data input.

In this commit, this whole broken chunking notion goes away and is
replaced by asynchronous processing. mapr is now a bit slower, but
works correctly and also uses much less shell working memory.

lib/modernish/mdl/sys/cmd/mapr.mm:
- Instead of invoking multiple awk's in a very broken way, invoke
  just one awk in the background using modernish portable process
  substitution (sys/cmd/procsubst). Read from this process using a
  'while' loop that evals each line read.
  (On most shells, we could '.' the input stream. But this does not
  work on bash 3.2, which cannot read dot scripts from a non-
  regular file. It also does not work on ksh93, which reads the
  entire dot script into memory before beginning execution, so that
  mapr wouldn't work with commands writing infinite output.)

lib/modernish/aux/sys/cmd/mapr.awk:
- Remove code that was used by the broken chunking mechanism.
  • Loading branch information
McDutchie committed May 5, 2020
1 parent 7aa566e commit dae120b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 35 deletions.
17 changes: 5 additions & 12 deletions lib/modernish/aux/sys/cmd/mapr.awk
Expand Up @@ -60,13 +60,9 @@ BEGIN {
c = 0; # count number of arguments per command invocation
L = L_cmd; # count total argument length per command invocation
tL = 0; # count total argument length per batch of commands
cont = 0; # flag for continuing with multiple awk invocations

}

NR == 1 {
NR = ENVIRON["_Msh_M_NR"] + 0; # number of records: inherit from previous batches

ORS = ""; # no automatic output record separation
shlen = 0; # count length of physical line of shell-quoted arguments
printsh("\"$@\" "); # print fixed command line argument(s)
Expand All @@ -91,12 +87,9 @@ opt_n && NR > opt_n + opt_s {

# Check the counters.
if ((opt_c && c >= opt_c) || (opt_m ? L >= opt_m : L >= arg_max)) {
# Try not to process much more than 4 megs of arguments per batch.
if ((tL += L) >= 4194304) {
cont = 1;
exit 0;
}
printsh("||_Msh_mapr_ckE \"$@\"||break;\"$@\" ");
printsh("||_Msh_mapr_ckE \"$@\"\n");
shlen = 0;
printsh("\"$@\" ");
c = 0;
L = L_cmd;
}
Expand All @@ -116,7 +109,7 @@ opt_n && NR > opt_n + opt_s {

END {
if (NR) {
printsh("||_Msh_mapr_ckE \"$@\"||break");
printsh("||_Msh_mapr_ckE \"$@\"");
}
print ("\n_Msh_M_NR=") (cont ? NR + 1 : "RET0") ("\n");
print ("\n! _Msh_M_status=0\n");
}
53 changes: 30 additions & 23 deletions lib/modernish/mdl/sys/cmd/mapr.mm
Expand Up @@ -51,6 +51,7 @@
# --- end license ---

use sys/cmd/extern
use sys/cmd/procsubst

# determine max length in bytes of arguments we can pass
_Msh_mapr_max=$(extern -p getconf ARG_MAX 2>/dev/null || putln 262144)
Expand Down Expand Up @@ -163,39 +164,45 @@

# --- main loop ---

thisshellhas BUG_EVALCOBR && _Msh_M_BUG_EVALCOBR= # provide dummy loop below within 'eval' to make 'break' work

_Msh_M_NR=1 # remember NR between awk invocations
while not str begin "${_Msh_M_NR}" "RET"; do
# Process one batch of input, producing and eval'ing commands until
# end of file or until a batch limit (quantum or length) is reached.
# Export LC_ALL=C to make awk length() count bytes, not characters.
eval "${_Msh_M_BUG_EVALCOBR+forever do }$(
export _Msh_M_NR _Msh_Mo_d _Msh_Mo_s _Msh_Mo_n _Msh_Mo_c _Msh_Mo_m \
POSIXLY_CORRECT=y LC_ALL=C "_Msh_ARG_MAX=${_Msh_mapr_max}" # BUG_NOEXPRO compat
extern -p awk -f "$MSH_AUX/sys/cmd/mapr.awk" "$@" || die "mapr: 'awk' failed"
)${_Msh_M_BUG_EVALCOBR+; break; done}"
done
unset -v _Msh_M_status
while IFS= read -r _Msh_M_cmd <&8; do
while str end "${_Msh_M_cmd}" '\'; do
# line continuation
IFS= read -r _Msh_M_cmd2 <&8 || die "mapr: internal error: line continuation failure"
_Msh_M_cmd=${_Msh_M_cmd}${CCn}${_Msh_M_cmd2}
unset -v _Msh_M_cmd2
done
eval "${_Msh_M_cmd}" 8<&- || break
done 8<$(% _Msh_mapr_doAwk "$@" 8<&0)
# ^^^^ save stdin for _Msh_mapr_doAwk() bg job
# ^^^^^ connect bg job to FD 8 using sys/cmd/procsubst

# cleanup; return with appropriate exit status

isset _Msh_M_status || die "mapr: internal error: no exit status"
eval "unset -v _Msh_Mo_P _Msh_Mo_d _Msh_Mo_n _Msh_Mo_s _Msh_Mo_c _Msh_Mo_m \
_Msh_M_ifQuantum _Msh_M_checkMax _Msh_M_NR _Msh_M_FIFO _Msh_M_i \
_Msh_M_BUG_EVALCOBR
return ${_Msh_M_NR#RET}"
_Msh_M_ifQuantum _Msh_M_checkMax _Msh_M_status _Msh_M_FIFO _Msh_M_i _Msh_M_cmd
return ${_Msh_M_status}"
}

# Helper function for running awk in the background using process substitution
_Msh_mapr_doAwk() {
# Export LC_ALL=C to make awk length() count bytes, not characters.
export _Msh_Mo_d _Msh_Mo_s _Msh_Mo_n _Msh_Mo_c _Msh_Mo_m \
POSIXLY_CORRECT=y LC_ALL=C "_Msh_ARG_MAX=${_Msh_mapr_max}" # BUG_NOEXPRO compat
extern -p awk -f "$MSH_AUX/sys/cmd/mapr.awk" "$@" <&8 || let "$? < 126 || $? == SIGPIPESTATUS" || die "mapr: 'awk' failed"
}

# Check a non-zero exit status of the callback command.
_Msh_mapr_ckE() {
_Msh_M_NR=RET$?
case ${_Msh_M_NR} in
( RET? | RET?? | RET1[01]? | RET12[012345] )
_Msh_M_status=$?
case ${_Msh_M_status} in
( ? | ?? | 1[01]? | 12[012345] )
;;
( "RET$SIGPIPESTATUS" | RET255 )
( "$SIGPIPESTATUS" | 255 )
return 1 ;;
( * ) shellquoteparams
thisshellhas --sig=${_Msh_M_NR#RET} && die "mapr: callback killed by SIG$REPLY: $@"
die "mapr: callback failed with status ${_Msh_M_NR#RET}: $@" ;;
thisshellhas --sig=${_Msh_M_status} && die "mapr: callback killed by SIG$REPLY: $@"
die "mapr: callback failed with status ${_Msh_M_status}: $@" ;;
esac
}

Expand Down

0 comments on commit dae120b

Please sign in to comment.