346 changes: 174 additions & 172 deletions cake-autorate.sh
Expand Up @@ -391,11 +391,13 @@ monitor_achieved_rates()

while true
do
unset command
read -r -t "${sleep_duration_s}" -u "${monitor_achieved_rates_fd}" -a command
if [[ "${command-}" ]]
then
case "${command[0]}" in
t_start_us="${EPOCHREALTIME/./}"

while read -r -t 0 -u "${monitor_achieved_rates_fd}"
do
unset command
read -r -u "${monitor_achieved_rates_fd}" -a command
case "${command[0]:-}" in

SET_VAR)
if [[ "${command[1]:-}" && "${command[2]:-}" ]]
Expand All @@ -411,45 +413,43 @@ monitor_achieved_rates()
:
;;
esac
else
t_start_us="${EPOCHREALTIME/./}"

# If rx/tx bytes file exists, read it in, otherwise set to prev_bytes
# This addresses interfaces going down and back up
[[ -f "${rx_bytes_path}" ]] && { read -r rx_bytes < "${rx_bytes_path}"; } 2> /dev/null || rx_bytes="${prev_rx_bytes}"
[[ -f "${tx_bytes_path}" ]] && { read -r tx_bytes < "${tx_bytes_path}"; } 2> /dev/null || tx_bytes="${prev_tx_bytes}"
done

dl_achieved_rate_kbps=$(( ((8000*(rx_bytes - prev_rx_bytes)) / compensated_monitor_achieved_rates_interval_us ) ))
ul_achieved_rate_kbps=$(( ((8000*(tx_bytes - prev_tx_bytes)) / compensated_monitor_achieved_rates_interval_us ) ))
# If rx/tx bytes file exists, read it in, otherwise set to prev_bytes
# This addresses interfaces going down and back up
[[ -f "${rx_bytes_path}" ]] && { read -r rx_bytes < "${rx_bytes_path}"; } 2> /dev/null || rx_bytes="${prev_rx_bytes}"
[[ -f "${tx_bytes_path}" ]] && { read -r tx_bytes < "${tx_bytes_path}"; } 2> /dev/null || tx_bytes="${prev_tx_bytes}"

((dl_achieved_rate_kbps<0)) && dl_achieved_rate_kbps=0
((ul_achieved_rate_kbps<0)) && ul_achieved_rate_kbps=0
dl_achieved_rate_kbps=$(( ((8000*(rx_bytes - prev_rx_bytes)) / compensated_monitor_achieved_rates_interval_us ) ))
ul_achieved_rate_kbps=$(( ((8000*(tx_bytes - prev_tx_bytes)) / compensated_monitor_achieved_rates_interval_us ) ))

printf "SET_VAR dl_achieved_rate_kbps %s\n" "${dl_achieved_rate_kbps}" >&"${main_fd}"
printf "SET_VAR ul_achieved_rate_kbps %s\n" "${ul_achieved_rate_kbps}" >&"${main_fd}"
((dl_achieved_rate_kbps<0)) && dl_achieved_rate_kbps=0
((ul_achieved_rate_kbps<0)) && ul_achieved_rate_kbps=0

dl_load_percent=$(( (100*dl_achieved_rate_kbps)/dl_shaper_rate_kbps ))
ul_load_percent=$(( (100*ul_achieved_rate_kbps)/ul_shaper_rate_kbps ))

for pinger_fd in "${pinger_fds[@]}"
do
printf "SET_VAR dl_load_percent %s\n" "${dl_load_percent}" >&"${pinger_fd}"
printf "SET_VAR ul_load_percent %s\n" "${ul_load_percent}" >&"${pinger_fd}"
done

if ((output_load_stats)); then
printf "SET_VAR dl_achieved_rate_kbps %s\n" "${dl_achieved_rate_kbps}" >&"${main_fd}"
printf "SET_VAR ul_achieved_rate_kbps %s\n" "${ul_achieved_rate_kbps}" >&"${main_fd}"

printf -v load_stats '%s; %s; %s; %s; %s' "${EPOCHREALTIME}" "${dl_achieved_rate_kbps}" "${ul_achieved_rate_kbps}" "${dl_shaper_rate_kbps}" "${ul_shaper_rate_kbps}"
log_msg "LOAD" "${load_stats}"
fi
dl_load_percent=$(( (100*dl_achieved_rate_kbps)/dl_shaper_rate_kbps ))
ul_load_percent=$(( (100*ul_achieved_rate_kbps)/ul_shaper_rate_kbps ))

for pinger_fd in "${pinger_fds[@]}"
do
printf "SET_VAR dl_load_percent %s\n" "${dl_load_percent}" >&"${pinger_fd}"
printf "SET_VAR ul_load_percent %s\n" "${ul_load_percent}" >&"${pinger_fd}"
done

prev_rx_bytes="${rx_bytes}"
prev_tx_bytes="${tx_bytes}"
if ((output_load_stats)); then

compensated_monitor_achieved_rates_interval_us=$(( monitor_achieved_rates_interval_us>(10*max_wire_packet_rtt_us) ? monitor_achieved_rates_interval_us : 10*max_wire_packet_rtt_us ))
printf -v load_stats '%s; %s; %s; %s; %s' "${EPOCHREALTIME}" "${dl_achieved_rate_kbps}" "${ul_achieved_rate_kbps}" "${dl_shaper_rate_kbps}" "${ul_shaper_rate_kbps}"
log_msg "LOAD" "${load_stats}"
fi

get_remaining_tick_time "${t_start_us}" "${compensated_monitor_achieved_rates_interval_us}"
prev_rx_bytes="${rx_bytes}"
prev_tx_bytes="${tx_bytes}"

compensated_monitor_achieved_rates_interval_us=$(( monitor_achieved_rates_interval_us>(10*max_wire_packet_rtt_us) ? monitor_achieved_rates_interval_us : 10*max_wire_packet_rtt_us ))

sleep_remaining_tick_time "${t_start_us}" "${compensated_monitor_achieved_rates_interval_us}"

done
}
Expand Down Expand Up @@ -1196,16 +1196,21 @@ maintain_pingers()
# Reflector maintenance loop - verifies reflectors have not gone stale and rotates reflectors as necessary
while true
do
unset command
read -r -t "${sleep_duration_s}" -u "${maintain_pingers_fd}" -a command
if [[ "${command:-}" ]]
then
case "${command[0]}" in
t_start_us="${EPOCHREALTIME/./}"

while read -r -t 0 -u "${maintain_pingers_fd}"
do
unset command
read -r -u "${maintain_pingers_fd}" -a command
case "${command[0]:-}" in

CHANGE_STATE)
CHANGE_STATE)
if [[ "${command[1]:-}" ]]
then
change_state_maintain_pingers "${command[1]}"
# break out of reading any new IPC commands to handle next state
# since next state might be to start or stop pingers
break
fi
;;
SET_ARRAY_ELEMENT)
Expand All @@ -1215,158 +1220,155 @@ maintain_pingers()
fi
;;
SET_VAR)
if [[ "${command[1]:-}" && "${command[2]:-}" ]]
then
export -n "${command[1]}=${command[2]}"
fi
;;
TERMINATE)
log_msg "DEBUG" "Terminating monitor_achieved_rates."
exit
;;
*)
:
;;
esac
else

t_start_us="${EPOCHREALTIME/./}"

case "${maintain_pingers_state}" in

START)
if ((pingers_active==0))
if [[ "${command[1]:-}" && "${command[2]:-}" ]]
then
start_pingers
pingers_active=1
export -n "${command[1]}=${command[2]}"
fi
change_state_maintain_pingers "RUNNING"
;;

STOP)
if ((pingers_active))
then
kill_pingers
pingers_active=0
fi
change_state_maintain_pingers "PAUSED"
TERMINATE)
log_msg "DEBUG" "Terminating monitor_achieved_rates."
exit
;;

PAUSED)
*)
:
;;

RUNNING)

if (( ${t_start_us}>(t_last_reflector_replacement_us+reflector_replacement_interval_mins*60*1000000) ))
then
pinger=$((RANDOM%no_pingers))
log_msg "DEBUG" "reflector: ${reflectors[pinger]} randomly selected for replacement."
replace_pinger_reflector "${pinger}"
t_last_reflector_replacement_us=${EPOCHREALTIME/./}
continue
fi

if (( ${t_start_us}>(t_last_reflector_comparison_us+reflector_comparison_interval_mins*60*1000000) )); then

t_last_reflector_comparison_us=${EPOCHREALTIME/./}
esac
done

case "${maintain_pingers_state}" in

[[ "${dl_owd_baselines_us[${reflectors[0]}]:-}" && "${dl_owd_baselines_us[${reflectors[0]}]:-}" && "${ul_owd_baselines_us[${reflectors[0]}]:-}" && "${ul_owd_baselines_us[${reflectors[0]}]:-}" ]] || continue
START)
if ((pingers_active==0))
then
start_pingers
pingers_active=1
fi
change_state_maintain_pingers "RUNNING"
;;

min_sum_owd_baselines_us=$(( dl_owd_baselines_us[${reflectors[0]}] + ul_owd_baselines_us[${reflectors[0]}] ))
min_dl_owd_delta_ewma_us="${dl_owd_delta_ewmas_us[${reflectors[0]}]}"
min_ul_owd_delta_ewma_us="${ul_owd_delta_ewmas_us[${reflectors[0]}]}"
STOP)
if ((pingers_active))
then
kill_pingers
pingers_active=0
fi
change_state_maintain_pingers "PAUSED"
;;

PAUSED)
;;

RUNNING)

for ((pinger=0; pinger < no_pingers; pinger++))
do
[[ "${dl_owd_baselines_us[${reflectors[pinger]}]:-}" && "${dl_owd_delta_ewmas_us[${reflectors[pinger]}]:-}" && "${ul_owd_baselines_us[${reflectors[pinger]}]:-}" && "${ul_owd_delta_ewmas_us[${reflectors[pinger]}]:-}" ]] || continue 2
if (( ${t_start_us}>(t_last_reflector_replacement_us+reflector_replacement_interval_mins*60*1000000) ))
then
pinger=$((RANDOM%no_pingers))
log_msg "DEBUG" "reflector: ${reflectors[pinger]} randomly selected for replacement."
replace_pinger_reflector "${pinger}"
t_last_reflector_replacement_us=${EPOCHREALTIME/./}
continue
fi

sum_owd_baselines_us[pinger]=$(( dl_owd_baselines_us[${reflectors[pinger]}] + ul_owd_baselines_us[${reflectors[pinger]}] ))
(( sum_owd_baselines_us[pinger] < min_sum_owd_baselines_us )) && min_sum_owd_baselines_us="${sum_owd_baselines_us[pinger]}"
(( dl_owd_delta_ewmas_us[${reflectors[pinger]}] < min_dl_owd_delta_ewma_us )) && min_dl_owd_delta_ewma_us="${dl_owd_delta_ewmas_us[${reflectors[pinger]}]}"
(( ul_owd_delta_ewmas_us[${reflectors[pinger]}] < min_ul_owd_delta_ewma_us )) && min_ul_owd_delta_ewma_us="${ul_owd_delta_ewmas_us[${reflectors[pinger]}]}"
done
if (( ${t_start_us}>(t_last_reflector_comparison_us+reflector_comparison_interval_mins*60*1000000) )); then

for ((pinger=0; pinger < no_pingers; pinger++))
do
t_last_reflector_comparison_us=${EPOCHREALTIME/./}

sum_owd_baselines_delta_us=$(( sum_owd_baselines_us[pinger] - min_sum_owd_baselines_us ))
dl_owd_delta_ewma_delta_us=$(( dl_owd_delta_ewmas_us[${reflectors[pinger]}] - min_dl_owd_delta_ewma_us ))
ul_owd_delta_ewma_delta_us=$(( ul_owd_delta_ewmas_us[${reflectors[pinger]}] - min_ul_owd_delta_ewma_us ))
[[ "${dl_owd_baselines_us[${reflectors[0]}]:-}" && "${dl_owd_baselines_us[${reflectors[0]}]:-}" && "${ul_owd_baselines_us[${reflectors[0]}]:-}" && "${ul_owd_baselines_us[${reflectors[0]}]:-}" ]] || continue

if ((output_reflector_stats))
then
printf -v reflector_stats '%s; %s; %s; %s; %s; %s; %s; %s; %s; %s; %s; %s; %s; %s' "${EPOCHREALTIME}" "${reflectors[pinger]}" "${min_sum_owd_baselines_us}" "${sum_owd_baselines_us[pinger]}" "${sum_owd_baselines_delta_us}" "${reflector_sum_owd_baselines_delta_thr_us}" "${min_dl_owd_delta_ewma_us}" "${dl_owd_delta_ewmas_us[${reflectors[pinger]}]}" "${dl_owd_delta_ewma_delta_us}" "${reflector_owd_delta_ewma_delta_thr_us}" "${min_ul_owd_delta_ewma_us}" "${ul_owd_delta_ewmas_us[${reflectors[pinger]}]}" "${ul_owd_delta_ewma_delta_us}" "${reflector_owd_delta_ewma_delta_thr_us}"
log_msg "REFLECTOR" "${reflector_stats}"
fi

if (( sum_owd_baselines_delta_us > reflector_sum_owd_baselines_delta_thr_us ))
then
log_msg "DEBUG" "Warning: reflector: ${reflectors[pinger]} sum_owd_baselines_us exceeds the minimum by set threshold."
replace_pinger_reflector "${pinger}"
continue 2
fi

if (( dl_owd_delta_ewma_delta_us > reflector_owd_delta_ewma_delta_thr_us ))
then
log_msg "DEBUG" "Warning: reflector: ${reflectors[pinger]} dl_owd_delta_ewma_us exceeds the minimum by set threshold."
replace_pinger_reflector "${pinger}"
continue 2
fi

if (( ul_owd_delta_ewma_delta_us > reflector_owd_delta_ewma_delta_thr_us ))
then
log_msg "DEBUG" "Warning: reflector: ${reflectors[pinger]} ul_owd_delta_ewma_us exceeds the minimum by set threshold."
replace_pinger_reflector "${pinger}"
continue 2
fi
done
min_sum_owd_baselines_us=$(( dl_owd_baselines_us[${reflectors[0]}] + ul_owd_baselines_us[${reflectors[0]}] ))
min_dl_owd_delta_ewma_us="${dl_owd_delta_ewmas_us[${reflectors[0]}]}"
min_ul_owd_delta_ewma_us="${ul_owd_delta_ewmas_us[${reflectors[0]}]}"

fi
for ((pinger=0; pinger < no_pingers; pinger++))
do
[[ "${dl_owd_baselines_us[${reflectors[pinger]}]:-}" && "${dl_owd_delta_ewmas_us[${reflectors[pinger]}]:-}" && "${ul_owd_baselines_us[${reflectors[pinger]}]:-}" && "${ul_owd_delta_ewmas_us[${reflectors[pinger]}]:-}" ]] || continue 2

enable_replace_pinger_reflector=1
sum_owd_baselines_us[pinger]=$(( dl_owd_baselines_us[${reflectors[pinger]}] + ul_owd_baselines_us[${reflectors[pinger]}] ))
(( sum_owd_baselines_us[pinger] < min_sum_owd_baselines_us )) && min_sum_owd_baselines_us="${sum_owd_baselines_us[pinger]}"
(( dl_owd_delta_ewmas_us[${reflectors[pinger]}] < min_dl_owd_delta_ewma_us )) && min_dl_owd_delta_ewma_us="${dl_owd_delta_ewmas_us[${reflectors[pinger]}]}"
(( ul_owd_delta_ewmas_us[${reflectors[pinger]}] < min_ul_owd_delta_ewma_us )) && min_ul_owd_delta_ewma_us="${ul_owd_delta_ewmas_us[${reflectors[pinger]}]}"
done

for ((pinger=0; pinger < no_pingers; pinger++))
do
reflector_check_time_us="${EPOCHREALTIME/./}"
reflector_last_timestamp_us="${reflector_last_timestamps_us["${reflectors[pinger]}"]}"
# shellcheck disable=SC2178
declare -n reflector_offences="reflector_${pinger}_offences"
(( reflector_offences[reflector_offences_idx] )) && ((sum_reflector_offences[pinger]--))
# shellcheck disable=SC2154
reflector_offences[reflector_offences_idx]=$(( (((reflector_check_time_us-reflector_last_timestamp_us) > reflector_response_deadline_us)) ? 1 : 0 ))
if (( reflector_offences[reflector_offences_idx] )); then
((sum_reflector_offences[pinger]++))
log_msg "DEBUG" "no ping response from reflector: ${reflectors[pinger]} within reflector_response_deadline: ${reflector_response_deadline_s}s"
log_msg "DEBUG" "reflector=${reflectors[pinger]}, sum_reflector_offences=${sum_reflector_offences[pinger]} and reflector_misbehaving_detection_thr=${reflector_misbehaving_detection_thr}"

sum_owd_baselines_delta_us=$(( sum_owd_baselines_us[pinger] - min_sum_owd_baselines_us ))
dl_owd_delta_ewma_delta_us=$(( dl_owd_delta_ewmas_us[${reflectors[pinger]}] - min_dl_owd_delta_ewma_us ))
ul_owd_delta_ewma_delta_us=$(( ul_owd_delta_ewmas_us[${reflectors[pinger]}] - min_ul_owd_delta_ewma_us ))

if ((output_reflector_stats))
then
printf -v reflector_stats '%s; %s; %s; %s; %s; %s; %s; %s; %s; %s; %s; %s; %s; %s' "${EPOCHREALTIME}" "${reflectors[pinger]}" "${min_sum_owd_baselines_us}" "${sum_owd_baselines_us[pinger]}" "${sum_owd_baselines_delta_us}" "${reflector_sum_owd_baselines_delta_thr_us}" "${min_dl_owd_delta_ewma_us}" "${dl_owd_delta_ewmas_us[${reflectors[pinger]}]}" "${dl_owd_delta_ewma_delta_us}" "${reflector_owd_delta_ewma_delta_thr_us}" "${min_ul_owd_delta_ewma_us}" "${ul_owd_delta_ewmas_us[${reflectors[pinger]}]}" "${ul_owd_delta_ewma_delta_us}" "${reflector_owd_delta_ewma_delta_thr_us}"
log_msg "REFLECTOR" "${reflector_stats}"
fi

if (( sum_owd_baselines_delta_us > reflector_sum_owd_baselines_delta_thr_us ))
then
log_msg "DEBUG" "Warning: reflector: ${reflectors[pinger]} sum_owd_baselines_us exceeds the minimum by set threshold."
replace_pinger_reflector "${pinger}"
continue 2
fi

if (( sum_reflector_offences[pinger] >= reflector_misbehaving_detection_thr )); then
log_msg "DEBUG" "Warning: reflector: ${reflectors[pinger]} seems to be misbehaving."
if ((enable_replace_pinger_reflector)); then
replace_pinger_reflector "${pinger}"
for ((i=0; i<reflector_misbehaving_detection_window; i++)) do reflector_offences[i]=0; done
sum_reflector_offences[pinger]=0
enable_replace_pinger_reflector=0
else
log_msg "DEBUG" "Warning: skipping replacement of reflector: ${reflectors[pinger]} given prior replacement within this reflector health check cycle."
fi
fi
if (( dl_owd_delta_ewma_delta_us > reflector_owd_delta_ewma_delta_thr_us ))
then
log_msg "DEBUG" "Warning: reflector: ${reflectors[pinger]} dl_owd_delta_ewma_us exceeds the minimum by set threshold."
replace_pinger_reflector "${pinger}"
continue 2
fi

if (( ul_owd_delta_ewma_delta_us > reflector_owd_delta_ewma_delta_thr_us ))
then
log_msg "DEBUG" "Warning: reflector: ${reflectors[pinger]} ul_owd_delta_ewma_us exceeds the minimum by set threshold."
replace_pinger_reflector "${pinger}"
continue 2
fi
done
((reflector_offences_idx=(reflector_offences_idx+1)%reflector_misbehaving_detection_window))
;;
*)
log_msg "ERROR" "Unrecognized maintain pingers state: ${maintain_pingers_state}."
log_msg "ERROR" "Setting state to RUNNING"
maintain_pingers_next_state="RUNNING"
change_maintain_pingers_state

fi

enable_replace_pinger_reflector=1

for ((pinger=0; pinger < no_pingers; pinger++))
do
reflector_check_time_us="${EPOCHREALTIME/./}"
reflector_last_timestamp_us="${reflector_last_timestamps_us["${reflectors[pinger]}"]}"
# shellcheck disable=SC2178
declare -n reflector_offences="reflector_${pinger}_offences"
(( reflector_offences[reflector_offences_idx] )) && ((sum_reflector_offences[pinger]--))
# shellcheck disable=SC2154
reflector_offences[reflector_offences_idx]=$(( (((reflector_check_time_us-reflector_last_timestamp_us) > reflector_response_deadline_us)) ? 1 : 0 ))
if (( reflector_offences[reflector_offences_idx] )); then
((sum_reflector_offences[pinger]++))
log_msg "DEBUG" "no ping response from reflector: ${reflectors[pinger]} within reflector_response_deadline: ${reflector_response_deadline_s}s"
log_msg "DEBUG" "reflector=${reflectors[pinger]}, sum_reflector_offences=${sum_reflector_offences[pinger]} and reflector_misbehaving_detection_thr=${reflector_misbehaving_detection_thr}"
fi
if (( sum_reflector_offences[pinger] >= reflector_misbehaving_detection_thr )); then
log_msg "DEBUG" "Warning: reflector: ${reflectors[pinger]} seems to be misbehaving."
if ((enable_replace_pinger_reflector)); then
replace_pinger_reflector "${pinger}"
for ((i=0; i<reflector_misbehaving_detection_window; i++)) do reflector_offences[i]=0; done
sum_reflector_offences[pinger]=0
enable_replace_pinger_reflector=0
else
log_msg "DEBUG" "Warning: skipping replacement of reflector: ${reflectors[pinger]} given prior replacement within this reflector health check cycle."
fi
fi
done
((reflector_offences_idx=(reflector_offences_idx+1)%reflector_misbehaving_detection_window))
;;
esac
fi
get_remaining_tick_time "${t_start_us}" "${reflector_health_check_interval_us}"
*)
log_msg "ERROR" "Unrecognized maintain pingers state: ${maintain_pingers_state}."
log_msg "ERROR" "Setting state to RUNNING"
maintain_pingers_next_state="RUNNING"
change_maintain_pingers_state
;;
esac
sleep_remaining_tick_time "${t_start_us}" "${reflector_health_check_interval_us}"
done
}
Expand Down