Skip to content

Commit

Permalink
Respawn Pure async worker on crash (#543)
Browse files Browse the repository at this point in the history
  • Loading branch information
mafredri committed May 1, 2020
1 parent 3589b7f commit 81dd496
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 63 deletions.
176 changes: 126 additions & 50 deletions async.zsh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
#
# zsh-async
#
# version: 1.7.2
# version: 1.8.0
# author: Mathias Fredriksson
# url: https://github.com/mafredri/zsh-async
#

typeset -g ASYNC_VERSION=1.7.2
typeset -g ASYNC_VERSION=1.8.0
# Produce debug output from zsh-async when set to 1.
typeset -g ASYNC_DEBUG=${ASYNC_DEBUG:-0}

Expand Down Expand Up @@ -37,19 +37,27 @@ _async_job() {
# block, after the command block has completed, the stdin for `cat` is
# closed, causing stderr to be appended with a $'\0' at the end to mark the
# end of output from this job.
local jobname=${ASYNC_JOB_NAME:-$1}
local stdout stderr ret tok
{
stdout=$(eval "$@")
ret=$?
duration=$(( EPOCHREALTIME - duration )) # Calculate duration.
local jobname=${ASYNC_JOB_NAME:-$1} out
out="$(
local stdout stderr ret tok
{
stdout=$(eval "$@")
ret=$?
duration=$(( EPOCHREALTIME - duration )) # Calculate duration.
print -r -n - $'\0'${(q)jobname} $ret ${(q)stdout} $duration
} 2> >(stderr=$(cat) && print -r -n - " "${(q)stderr}$'\0')
)"
if [[ $out != $'\0'*$'\0' ]]; then
# Corrupted output (aborted job?), skipping.
return
fi

# Grab mutex lock, stalls until token is available.
read -r -k 1 -p tok || exit 1
# Grab mutex lock, stalls until token is available.
read -r -k 1 -p tok || return 1

# Return output (<job_name> <return_code> <stdout> <duration> <stderr>).
print -r -n - $'\0'${(q)jobname} $ret ${(q)stdout} $duration
} 2> >(stderr=$(cat) && print -r -n - " "${(q)stderr}$'\0')
# Return output (<job_name> <return_code> <stdout> <duration> <stderr>).
print -r -n - "$out"

# Unlock mutex by inserting a token.
print -n -p $tok
Expand All @@ -73,10 +81,13 @@ _async_worker() {
# When a zpty is deleted (using -d) all the zpty instances created before
# the one being deleted receive a SIGHUP, unless we catch it, the async
# worker would simply exit (stop working) even though visible in the list
# of zpty's (zpty -L).
TRAPHUP() {
return 0 # Return 0, indicating signal was handled.
}
# of zpty's (zpty -L). This has been fixed around the time of Zsh 5.4
# (not released).
if ! is-at-least 5.4.1; then
TRAPHUP() {
return 0 # Return 0, indicating signal was handled.
}
fi

local -A storage
local unique=0
Expand Down Expand Up @@ -121,15 +132,33 @@ _async_worker() {
# Register a SIGCHLD trap to handle the completion of child processes.
trap child_exit CHLD

# Process option parameters passed to worker
while getopts "np:u" opt; do
# Process option parameters passed to worker.
while getopts "np:uz" opt; do
case $opt in
n) notify_parent=1;;
p) parent_pid=$OPTARG;;
u) unique=1;;
z) notify_parent=0;; # Uses ZLE watcher instead.
esac
done

# Terminate all running jobs, note that this function does not
# reinstall the child trap.
terminate_jobs() {
trap - CHLD # Ignore child exits during kill.
coproc : # Quit coproc.
coproc_pid=0 # Reset pid.

if is-at-least 5.4.1; then
trap '' HUP # Catch the HUP sent to this process.
kill -HUP -$$ # Send to entire process group.
trap - HUP # Disable HUP trap.
else
# We already handle HUP for Zsh < 5.4.1.
kill -HUP -$$ # Send to entire process group.
fi
}

killjobs() {
local tok
local -a pids
Expand All @@ -143,27 +172,36 @@ _async_worker() {
# process is in the middle of writing to stdin during kill.
(( coproc_pid )) && read -r -k 1 -p tok

kill -HUP -$$ # Send to entire process group.
coproc : # Quit coproc.
coproc_pid=0 # Reset pid.
terminate_jobs
trap child_exit CHLD # Reinstall child trap.
}

local request do_eval=0
local -a cmd
while :; do
# Wait for jobs sent by async_job.
read -r -d $'\0' request || {
# Since we handle SIGHUP above (and thus do not know when `zpty -d`)
# occurs, a failure to read probably indicates that stdin has
# closed. This is why we propagate the signal to all children and
# exit manually.
kill -HUP -$$ # Send SIGHUP to all jobs.
exit 0
# Unknown error occurred while reading from stdin, the zpty
# worker is likely in a broken state, so we shut down.
terminate_jobs

# Stdin is broken and in case this was an unintended
# crash, we try to report it as a last hurrah.
print -r -n $'\0'"'[async]'" $(( 127 + 3 )) "''" 0 "'$0:$LINENO: zpty fd died, exiting'"$'\0'

# We use `return` to abort here because using `exit` may
# result in an infinite loop that never exits and, as a
# result, high CPU utilization.
return $(( 127 + 1 ))
}

# We need to clean the input here because sometimes when a zpty
# has died and been respawned, messages will be prefixed with a
# carraige return (\r, or \C-M).
request=${request#$'\C-M'}

# Check for non-job commands sent to worker
case $request in
_unset_trap) notify_parent=0; continue;;
_killjobs) killjobs; continue;;
_async_eval*) do_eval=1;;
esac
Expand All @@ -175,9 +213,11 @@ _async_worker() {
# Name of the job (first argument).
local job=$cmd[1]

# If worker should perform unique jobs
if (( unique )); then
# Check if a previous job is still running, if yes, let it finnish
# Check if a worker should perform unique jobs, unless
# this is an eval since they run synchronously.
if (( !do_eval )) && (( unique )); then
# Check if a previous job is still running, if yes,
# skip this job and let the previous one finish.
for pid in ${${(v)jobstates##*:*:}%\=*}; do
if [[ ${storage[$job]} == $pid ]]; then
continue 2
Expand Down Expand Up @@ -317,7 +357,7 @@ _async_zle_watcher() {
async_stop_worker $worker

if [[ -n $callback ]]; then
$callback '[async]' 2 "" 0 "$worker:zle -F $1 returned error $2" 0
$callback '[async]' 2 "" 0 "$0:$LINENO: error: fd for $worker failed: zle -F $1 returned error $2" 0
fi
return
fi;
Expand All @@ -327,6 +367,28 @@ _async_zle_watcher() {
fi
}

_async_send_job() {
setopt localoptions noshwordsplit noksharrays noposixidentifiers noposixstrings

local caller=$1
local worker=$2
shift 2

zpty -t $worker &>/dev/null || {
typeset -gA ASYNC_CALLBACKS
local callback=$ASYNC_CALLBACKS[$worker]

if [[ -n $callback ]]; then
$callback '[async]' 3 "" 0 "$0:$LINENO: error: no such worker: $worker" 0
else
print -u2 "$caller: no such async worker: $worker"
fi
return 1
}

zpty -w $worker "$@"$'\0'
}

#
# Start a new asynchronous job on specified worker, assumes the worker is running.
#
Expand All @@ -344,8 +406,7 @@ async_job() {
cmd=(${(q)cmd}) # Quote special characters in multi argument commands.
fi

# Quote the cmd in case RC_EXPAND_PARAM is set.
zpty -w $worker "$cmd"$'\0'
_async_send_job $0 $worker "$cmd"
}

#
Expand All @@ -369,7 +430,7 @@ async_worker_eval() {
fi

# Quote the cmd in case RC_EXPAND_PARAM is set.
zpty -w $worker "_async_eval $cmd"$'\0'
_async_send_job $0 $worker "_async_eval $cmd"
}

# This function traps notification signals and calls all registered callbacks
Expand All @@ -392,7 +453,7 @@ _async_notify_trap() {
async_register_callback() {
setopt localoptions noshwordsplit nolocaltraps

typeset -gA ASYNC_CALLBACKS
typeset -gA ASYNC_PTYS ASYNC_CALLBACKS
local worker=$1; shift

ASYNC_CALLBACKS[$worker]="$*"
Expand All @@ -401,6 +462,14 @@ async_register_callback() {
# workers to notify (via -n) when a job is done.
if [[ ! -o interactive ]] || [[ ! -o zle ]]; then
trap '_async_notify_trap' WINCH
elif [[ -o interactive ]] && [[ -o zle ]]; then
local fd w
for fd w in ${(@kv)ASYNC_PTYS}; do
if [[ $w == $worker ]]; then
zle -F $fd _async_zle_watcher # Register the ZLE handler.
break
fi
done
fi
}

Expand Down Expand Up @@ -465,6 +534,8 @@ async_start_worker() {
setopt localoptions noshwordsplit

local worker=$1; shift
local -a args
args=("$@")
zpty -t $worker &>/dev/null && return

typeset -gA ASYNC_PTYS
Expand All @@ -478,23 +549,29 @@ async_start_worker() {
unsetopt xtrace
}

if (( ! ASYNC_ZPTY_RETURNS_FD )) && [[ -o interactive ]] && [[ -o zle ]]; then
# When zpty doesn't return a file descriptor (on older versions of zsh)
# we try to guess it anyway.
integer -l zptyfd
exec {zptyfd}>&1 # Open a new file descriptor (above 10).
exec {zptyfd}>&- # Close it so it's free to be used by zpty.
if [[ -o interactive ]] && [[ -o zle ]]; then
# Inform the worker to ignore the notify flag and that we're
# using a ZLE watcher instead.
args+=(-z)

if (( ! ASYNC_ZPTY_RETURNS_FD )); then
# When zpty doesn't return a file descriptor (on older versions of zsh)
# we try to guess it anyway.
integer -l zptyfd
exec {zptyfd}>&1 # Open a new file descriptor (above 10).
exec {zptyfd}>&- # Close it so it's free to be used by zpty.
fi
fi

zpty -b $worker _async_worker -p $$ $@ || {
zpty -b $worker _async_worker -p $$ $args || {
async_stop_worker $worker
return 1
}

# Re-enable it if it was enabled, for debugging.
(( has_xtrace )) && setopt xtrace

if [[ $ZSH_VERSION < 5.0.8 ]]; then
if ! is-at-least 5.0.8; then
# For ZSH versions older than 5.0.8 we delay a bit to give
# time for the worker to start before issuing commands,
# otherwise it will not be ready to receive them.
Expand All @@ -506,11 +583,7 @@ async_start_worker() {
REPLY=$zptyfd # Use the guessed value for the file desciptor.
fi

ASYNC_PTYS[$REPLY]=$worker # Map the file desciptor to the worker.
zle -F $REPLY _async_zle_watcher # Register the ZLE handler.

# Disable trap in favor of ZLE handler when notify is enabled (-n).
async_job $worker _unset_trap
ASYNC_PTYS[$REPLY]=$worker # Map the file desciptor to the worker.
fi
}

Expand Down Expand Up @@ -556,6 +629,9 @@ async_init() {
zmodload zsh/zpty
zmodload zsh/datetime

# Load is-at-least for reliable version check.
autoload -Uz is-at-least

# Check if zsh/zpty returns a file descriptor or not,
# shell must also be interactive with zle enabled.
typeset -g ASYNC_ZPTY_RETURNS_FD=0
Expand Down
Loading

0 comments on commit 81dd496

Please sign in to comment.