Permalink
Browse files

S3 backup for Waiter Service logs on K8s (#547)

  • Loading branch information...
DaoWen authored and shamsimam committed Jan 15, 2019
1 parent 15d692d commit e156a93e0d15bf538c592c91932d552d8e678b37
@@ -1,3 +1,8 @@
.cache
__pycache__
kitchen.log

/latest
/r[0-9]
/.r[0-9].index.json
/.waiter-container-runs
@@ -13,6 +13,18 @@ start_test() {
working_dir=r$(( $(cat .waiter-container-runs) - 1 ))
}

start_guarded_test() {
test_desc="$1"
shift
if test "$@"; then
start_test "$test_desc"
test_started=true
else
printf '\nSkipping Test %s' "$test_desc"
test_started=false
fi
}

send_sigterm() {
sleep 0.1
kill $test_pid
@@ -136,4 +148,23 @@ assert $working_dir != $old_working_dir
assert -f $working_dir/$fresh_file_name
assert $(cat $working_dir/$fresh_file_name) == hello

#
# Ensure that the stdout file captured by the wrapper script
# is uploaded to S3 when the script is terminated.
# This test runs iff WAITER_LOG_BUCKET_URL is non-null.
#
start_guarded_test 'stdout uploaded to s3 bucket' "$WAITER_LOG_BUCKET_URL" <<EOF
echo TestString && sleep 10
EOF
if $test_started; then
send_sigterm
send_sigterm
await_exit_and_assert_code 143
pod_name=$(hostname --short)
target_bucket_dir=$WAITER_LOG_BUCKET_URL/$pod_name/$working_dir
assert "$(curl -s $target_bucket_dir/stdout)" == TestString
assert "$(curl -s $target_bucket_dir/index.json | jq -r '.[0].name')" == stdout
assert "$(curl -s $target_bucket_dir/index.json | jq '.[0].size')" == 11
fi

printf '\n\nAll tests passed\n'
@@ -17,7 +17,8 @@ waiter_child_pid=
# This double-termination is an important part of our Waiter scale-down logic,
# and the mechanics are described in more detail below.
handle_k8s_terminate() {
trap : SIGTERM # reset SIGTERM handler to no-op
waiter_2nd_sigterm=false
trap handle_2nd_k8s_terminate SIGTERM # new handler for next sigterm

# Propagate the SIGTERM to the user's app's process group,
# giving it the opportunity to shut down gracefully.
@@ -27,24 +28,75 @@ handle_k8s_terminate() {
echo 'waiter error: user process not initialized' >&2
fi

# Wait for a long time (15 minutes), awaiting another signal from Kubernetes.
# This delay gives Waiter time to safely update the desired replica count
# before the pod actually terminates, avoiding a race to replace this pod.
# If we receive a second SIGTERM from Kubernetes, then the sleep period is canceled,
# and we simply wait for the user's process to complete (or get SIGKILLed).
# The main point here is to NOT exit before the second SIGTERM is received.
# If for some reason the second SIGTERM never arrives, the sleep will eventually expire,
# or the pod's grace period will expire (resulting in a SIGKILL from Kubernetes).
# Likewise, if the user's process takes too long to terminate gracefully,
# the pod's grace period will expire (resulting in a SIGKILL from Kubernetes).
sleep 900 &
wait # wait for sleep to complete, or a second SIGTERM
kill %2 # cancel sleep
wait # wait for graceful termination of user's process
waiter_init_pid=$$

{
# Give the user's application a few seconds to gracefully exit,
# then forcefully terminate with a SIGKILL if it's still running.
sleep ${WAITER_GRACE_SECS:=3}
kill -9 -- -$waiter_child_pid

# Wait for another signal from Kubernetes.
# This delay gives Waiter time to safely update the desired replica count
# before the pod actually terminates, avoiding a race to replace this pod.
# If we receive a second SIGTERM from Kubernetes, then the sleep period is canceled,
# and we simply wait for the user's process to complete (or get SIGKILLed).
# The main point here is to NOT exit before the second SIGTERM is received.
# If for some reason the second SIGTERM never arrives, the sleep will eventually expire,
# or the pod's grace period will expire (resulting in a SIGKILL from Kubernetes).
# Likewise, if the user's process takes too long to terminate gracefully,
# the pod's grace period will expire (resulting in a SIGKILL from Kubernetes).
# However, if we don't see the second SIGTERM after a reasonable delay,
# we assume we missed it (due to the asyncronous nature of the system),
# and that it is now safe to terminate this pod.
sleep ${WAITER_SYNC_MAX_SECS:=10}
kill -15 $waiter_init_pid
} &

# wait for graceful termination of user's process
while kill -0 $waiter_child_pid; do
wait %1
done &>/dev/null

# send logs to S3 (if enabled)
if [ "$WAITER_LOG_BUCKET_URL" ]; then
# Extract this pod's name from the hostname
# (this is used to create a unique path in S3)
pod_name=$(hostname --short)
base_url="$WAITER_LOG_BUCKET_URL/$pod_name"
# For each ./r* directory created by a container restart,
# we upload the stdout and stderr to the target directory in the S3 bucket.
# We work backwards from the most recent run down to 0 to increase the odds
# that our most recent runs' logs are successfully persisted before a SIGKILL.
cd "$waiter_sandbox_base_dir"
for i in $(seq $waiter_restart_count -1 0); do
waiter_log_files='stdout stderr'
for f in $waiter_log_files; do
logfile="r$i/$f"
# Using the -T option with curl PUTs the target file to the given URL,
# and avoids loading the full file into memory when sending the payload.
curl -s -T "$logfile" "$base_url/$logfile"
done
done
fi

# wait for second sigterm to arrive
while [ $waiter_2nd_sigterm != true ]; do
sleep 0.1
done

# Exit container with code 128+15=143, indicating termination via SIGTERM.
exit 143
}

# Catch the second SIGTERM sent by Kubernetes on pod deletion.
# This double-termination is an important part of our Waiter scale-down logic,
# and the mechanics are described in more detail above (in handle_k8s_terminate).
handle_2nd_k8s_terminate() {
trap : SIGTERM # reset SIGTERM handler to no-op
waiter_2nd_sigterm=true
}

trap handle_k8s_terminate SIGTERM

# Track container restart count
@@ -53,6 +105,7 @@ echo $(( $waiter_restart_count + 1 )) > .waiter-container-runs

# Ensure that HOME is set to the fresh working directory for this container instance.
# HOME should be a symlink ./latest, which points to the new working directory.
waiter_sandbox_base_dir="$(pwd -P)"
waiter_sandbox_dir="./r${waiter_restart_count}"
mkdir -p "$waiter_sandbox_dir"
ln -Tsf $waiter_sandbox_dir latest
@@ -7,5 +7,5 @@ export WAITER_FILESERVER_PORT
# Generate server config from template
envsubst </root/nginx.conf.template >/root/nginx.conf

# Start server in non-daemon mode
nginx -c /root/nginx.conf
# Start server in non-daemon mode, replacing current process
exec nginx -c /root/nginx.conf
@@ -2,8 +2,8 @@
# Usage: run-integration-tests-k8s-scheduler.sh [TEST_COMMAND] [TEST_SELECTOR]
#
# Examples:
# run-integration-tests-k8s-scheduler.sh parallel-test integration-fast
# run-integration-tests-k8s-scheduler.sh parallel-test integration-slow
# run-integration-tests-k8s-scheduler.sh parallel-test integration-heavy
# run-integration-tests-k8s-scheduler.sh parallel-test integration-lite
# run-integration-tests-k8s-scheduler.sh parallel-test
# run-integration-tests-k8s-scheduler.sh
#
@@ -21,6 +21,13 @@ KITCHEN_DIR=${WAITER_DIR}/../kitchen
# Start minikube
${DIR}/minikube-setup.sh

# Start S3 test server
if [[ $TEST_SELECTOR =~ heavy$ ]]; then
${DIR}/s3-server-setup.sh
S3SERVER_IP=$(docker inspect s3server | jq -r '.[0].NetworkSettings.Networks.bridge.IPAddress')
export WAITER_S3_BUCKET=http://$S3SERVER_IP:8000/waiter-service-logs
fi

# Ensure we have the docker image for the pods
${KITCHEN_DIR}/bin/build-docker-image.sh

@@ -0,0 +1,26 @@
#!/bin/bash

set -e

# Install AWS CLI (for s3api commands) via pip
# We use this to handle S3 authentication for bucket creation
# https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-install.html
type aws || pip install awscli --upgrade --user

# Run cloudserver (S3 compatible test server) via docker
# The API server endpoint is accessible via localhost:8888
# https://hub.docker.com/r/scality/s3server
echo Starting S3 server docker container
docker run --name s3server --detach --rm --publish=8888:8000 scality/s3server:6018536a
echo -n Waiting for S3 server
while ! curl localhost:8888 &>/dev/null; do
echo -n .
sleep 1
done
echo

# Create a public RW bucket for waiter app logs, using the default cloudserver credentials
# https://github.com/scality/cloudserver#run-it-with-a-file-backend
AWS_ACCESS_KEY_ID=accessKey1 AWS_SECRET_ACCESS_KEY=verySecretKey1 \
aws s3api create-bucket --endpoint-url=http://localhost:8888 \
--acl=public-read-write --bucket=waiter-service-logs --output=table
@@ -349,6 +349,14 @@
:authorizer {:kind :default
:default {:factory-fn waiter.authorization/noop-authorizer}}

;; The :log-bucket-url setting is optional, but if it's non-nil, it should be the URL (string) of an S3 bucket
;; where log files should be copied when a waiter service pod is terminated.
;; When a :log-bucket-url is given, the pod waits up to an additional :log-bucket-sync-secs before terminating
;; (starting after :pod-sigkill-delay-secs or the user process's graceful exit).
;; The log-bucket-sync-secs option should be set between 0 and 300 seconds, inclusive (default is 180).
:log-bucket-sync-secs 180
:log-bucket-url "http://s3.example.com/waiter-service-logs"

;; String value used to annotate Kubernetes objects that are orchestrated by this Waiter instantiation:
:cluster-name "waiter"

@@ -37,6 +37,8 @@
:kubernetes {:authorizer {:kind :sanity-check
:sanity-check {:factory-fn waiter.authorization/sanity-check-authorizer}}
:fileserver {:port 591}
:log-bucket-sync-secs 10
:log-bucket-url #config/env-default ["WAITER_S3_BUCKET" nil]
:url "http://localhost:8001"}}

; ---------- Error Handling ----------
@@ -1,5 +1,10 @@
(ns waiter.kubernetes-scheduler-integration-test
(:require [clojure.test :refer :all]
(:require [clojure.data.json :as json]
[clojure.set :as set]
[clojure.string :as string]
[clojure.walk :as walk]
[clojure.test :refer :all]
[clojure.tools.logging :as log]
[waiter.util.client-tools :refer :all]))

(deftest ^:parallel ^:integration-fast test-kubernetes-watch-state-update
@@ -34,3 +39,91 @@
(< initial-rs-snapshot-version initial-rs-watch-version)))
(is (<= initial-rs-snapshot-version rs-snapshot-version'))
(is (< rs-snapshot-version' rs-watch-version'))))))))

(deftest ^:parallel ^:integration-slow ^:resource-heavy test-s3-logs
(testing-using-waiter-url
(when (using-k8s? waiter-url)
(let [headers {:x-waiter-name (rand-name)
:x-waiter-max-instances 2
:x-waiter-scale-up-factor 0.99
:x-waiter-scale-down-factor 0.99
:x-kitchen-delay-ms 500}
_ (log/info "making canary request...")
{:keys [cookies instance-id service-id]} (make-request-with-debug-info headers #(make-kitchen-request waiter-url %))
request-fn (fn [] (->> #(make-kitchen-request waiter-url %)
(make-request-with-debug-info headers)
:instance-id))]
(with-service-cleanup
service-id
(assert-service-on-all-routers waiter-url service-id cookies)
;; Get a service with at least one active and one killed instance.
;; This portion of the test logic was copied from basic-test/test-killed-instances
(log/info "starting parallel requests")
(let [instance-ids-atom (atom #{})
instance-request-fn (fn []
(let [instance-id (request-fn)]
(swap! instance-ids-atom conj instance-id)))
instance-ids (->> (parallelize-requests 4 10 instance-request-fn
:canceled? (fn [] (> (count @instance-ids-atom) 2))
:verbose true
:service-id service-id)
(reduce set/union))]
(is (> (count instance-ids) 1) (str instance-ids)))

(log/info "waiting for at least one instance to get killed")
(is (wait-for #(->> (get-in (service-settings waiter-url service-id) [:instances :killed-instances])
(map :id)
set
seq)
:interval 2 :timeout 45)
(str "No killed instances found for " service-id))

;; Test that the active instances' logs are available.
;; This portion of the test logic was copied from basic-test/test-basic-logs
(let [active-instances (get-in (service-settings waiter-url service-id :cookies cookies)
[:instances :active-instances])
log-url (:log-url (first active-instances))
_ (log/debug "Log Url Active:" log-url)
make-request-fn (fn [url] (make-request url "" :verbose true))
{:keys [body] :as logs-response} (make-request-fn log-url)
_ (assert-response-status logs-response 200)
_ (log/debug "Response body:" body)
log-files-list (walk/keywordize-keys (json/read-str body))
stdout-file-link (:url (first (filter #(= (:name %) "stdout") log-files-list)))
stderr-file-link (:url (first (filter #(= (:name %) "stderr") log-files-list)))]
(is (every? #(string/includes? body %) ["stderr" "stdout"])
(str "Directory listing is missing entries: stderr and stdout, got response: " logs-response))
(doseq [file-link [stderr-file-link stdout-file-link]]
(if (string/starts-with? (str file-link) "http")
(assert-response-status (make-request-fn file-link) 200)
(log/warn "test-basic-logs did not verify file link:" stdout-file-link))))

;; Test that the killed instances' logs were persisted to S3.
;; This portion of the test logic was modified from the active-instances tests above.
(let [log-bucket-url (k8s-log-bucket-url waiter-url)
killed-instances (get-in (service-settings waiter-url service-id :cookies cookies)
[:instances :killed-instances])
log-url (:log-url (first killed-instances))
make-request-fn (fn [url] (make-request url "" :verbose true))
_ (do
(log/info "waiting s3 logs to appear")
(is (wait-for
#(let [{:keys [body] :as logs-response} (make-request-fn log-url)]
(string/includes? body log-bucket-url))
:interval 1 :timeout 60)
(str "Log URL never pointed to S3 bucket " log-bucket-url)))
_ (log/debug "Log Url Killed:" log-url)
{:keys [body] :as logs-response} (make-request-fn log-url)
_ (assert-response-status logs-response 200)
_ (log/debug "Response body:" body)
log-files-list (walk/keywordize-keys (json/read-str body))
stdout-file-link (:url (first (filter #(= (:name %) "stdout") log-files-list)))
stderr-file-link (:url (first (filter #(= (:name %) "stderr") log-files-list)))]
(is (wait-for
#(every? (partial string/includes? body) ["stderr" "stdout"])
:interval 1 :timeout 30)
(str "Directory listing is missing entries: stderr and stdout, got response: " logs-response))
(doseq [file-link [stderr-file-link stdout-file-link]]
(if (string/starts-with? (str file-link) "http")
(assert-response-status (make-request-fn file-link) 200)
(log/warn "test-basic-logs did not verify file link:" stdout-file-link)))))))))
@@ -78,6 +78,7 @@
[org.clojure/data.codec "0.1.1"]
[org.clojure/data.json "0.2.6"]
[org.clojure/data.priority-map "0.0.10"]
[org.clojure/data.zip "0.1.2"]
[org.clojure/tools.cli "0.4.1"]
[org.clojure/tools.logging "0.4.1"]
[org.clojure/tools.namespace "0.2.11"]
Oops, something went wrong.

0 comments on commit e156a93

Please sign in to comment.