Skip to content
Browse files

adds integration test for clients that cancel via process exit (#887)

* aborts backend requests using the abort-ch

* handles multiple abort attempts to a backend request

* adds integration test for clients that cancel via process exit
  • Loading branch information...
shamsimam authored and sradack committed Aug 9, 2019
1 parent 8c20b06 commit 5584aa65c56e92cf8bfa714b51a76f199bbb10c6
@@ -6,7 +6,7 @@


@@ -92,6 +92,14 @@ public void apply(final Context.CancellableContext cancellableContext,
throw new CancellationException(message);
EXIT() {
public void apply(final Context.CancellableContext cancellableContext,
final ClientCallStreamObserver<?> observer,
final String message) {
NONE() {
public void apply(final Context.CancellableContext cancellableContext,
@@ -15,6 +15,7 @@
(ns waiter.grpc-test
(:require [clojure.core.async :as async]
[ :as sh]
[clojure.string :as str]
[clojure.test :refer :all]
[ :as log]
@@ -598,6 +599,23 @@
(is (= (reduce + (map count messages)) (.getTotalLength summary)) assertion-message))
(assert-request-state grpc-client request-headers service-id correlation-id ::success)))))))))

(deftest ^:parallel ^:integration-slow test-grpc-bidi-streaming-client-exit
(when-not (behind-proxy? waiter-url)
(let [{:keys [h2c-port host request-headers service-id]} (start-courier-instance waiter-url)
{:strs [cookie]} request-headers]
(let [correlation-id (rand-name)
{:keys [err out]} (sh/sh "lein" "exec" "-p"
host (str h2c-port) service-id correlation-id cookie)]
(log/info "exec stdout:" out)
(log/info "exec stderr:" err)
(Thread/sleep 1500) ;; sleep to allow cancellation propagation to backend
(let [grpc-client (initialize-grpc-client correlation-id host h2c-port)]
(assert-request-state grpc-client request-headers service-id correlation-id ::client-cancel))))))))

(deftest ^:parallel ^:integration-fast test-grpc-client-streaming-deadline-exceeded
(let [{:keys [h2c-port host request-headers service-id]} (start-courier-instance waiter-url)
@@ -34,7 +34,7 @@
:exclusions [prismatic/schema ring/ring-core]]
[clj-jgit "0.8.10"
:scope "test"]
[twosigma/courier "1.5.9"
[twosigma/courier "1.5.14"
:exclusions [ io.grpc/grpc-core]
:scope "test"]
;; avoids the following:
@@ -114,8 +114,9 @@

:resource-paths ["resources"]
:main waiter.main
:plugins [[test2junit "1.2.2"]
[com.holychao/parallel-test "0.3.2"]]
:plugins [[com.holychao/parallel-test "0.3.2"]
[lein-exec "0.3.7"]
[test2junit "1.2.2"]]
; In case of kerberos problems, export KRB5_KTNAME=/var/spool/keytabs/$(id -un)
:jvm-opts ["-server"
@@ -0,0 +1,64 @@
;; Copyright (c) Two Sigma Open Source, LLC
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns grpc.grpc-client-exit
(:require [clj-time.core :as t]
[clj-time.format :as f]
[clojure.test :refer :all]
[waiter.util.client-tools :refer :all]
[ :as du]
[waiter.util.utils :as utils])
(:import (com.twosigma.waiter.courier GrpcClient GrpcClient$CancellationPolicy)
(java.util.function Function)))

(defn- initialize-grpc-client
"Initializes grpc client logging to specific correlation id"
[correlation-id host port]
(let [date-formatter (:date-hour-minute-second-fraction f/formatters)
log-function (reify Function
(apply [_ message]
(println (du/date-to-str (t/now) date-formatter) correlation-id message)))]
(GrpcClient. host port log-function)))

(println "num command line arguments:" (count *command-line-args*))
(let [[_ host h2c-port service-id correlation-id cookie] *command-line-args*
token (str "^SERVICE-ID#" service-id)
port (Integer/parseInt h2c-port)
grpc-client (initialize-grpc-client correlation-id host port)
request-headers {"cookie" cookie
"x-cid" correlation-id
"x-waiter-token" token}
num-messages 100
ids (map #(str "id-" %) (range num-messages))
from "from"
messages (map #(str "message-" %) (range num-messages))
inter-message-sleep-ms 100
lock-step-mode true
cancel-threshold (/ num-messages 2)
cancellation-policy GrpcClient$CancellationPolicy/EXIT
deadline-duration-ms 60000]
(println "host:" host)
(println "port:" port)
(println "request-headers:" (update request-headers "cookie" utils/truncate 20))
(println "num-messages:" num-messages)
(println "cancel-threshold:" cancel-threshold)
(println "invoking GrpcClient method...")
(.collectPackages grpc-client request-headers ids from messages inter-message-sleep-ms lock-step-mode
cancel-threshold cancellation-policy deadline-duration-ms))
(catch Throwable throwable
(.printStackTrace throwable System/out))
(System/exit 0)))

0 comments on commit 5584aa6

Please sign in to comment.
You can’t perform that action at this time.