-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
129 lines (116 loc) · 4.03 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
(ns fdk-clj.core
(:require [clojure.string :as s]
[cheshire.core :refer :all]
[clj-time.core :as t]
[clj-time.format :as f])
(:gen-class))
(declare handle-request)
(declare handle-result)
;
;
; Start runloop. Call in your main function. As an argument
; you must provide the function that is to be called when a new
; request comes in.
; That function must take a single argument which is a map with the following layout:
; {
; :app_name "app"
; :app_route "/func"
; :call_id "id"
; :config { "FN_APP_NAME" "app" "FN_PATH" "func" ... env variables ... }
; :headers { http headers }
; :arguments {}
; :fn_format "cloudevent"
; :execution_type "sync"
; :deadline "2018-01-01T23:59:59.999"
; :content_type "application/json"
; :request_url "http://domain.com/r/app/func"
; [optional] :cloudevent { event map } ;; only applies to CloudEvent Format
; }
;
(defn handle [func-entrypoint]
(let [rdr (clojure.java.io/reader *in*)]
(doseq [line (line-seq rdr)]
(let [inp (parse-string line true)
res (handle-result (handle-request inp func-entrypoint))]
(println (generate-string res))))))
;
;
;
;
;
;
;
(defonce env
{
:app (System/getenv "FN_APP_NAME")
:path (System/getenv "FN_PATH")
:fmt (System/getenv "FN_FORMAT")
:type (System/getenv "FN_TYPE")
:config (System/getenv)
})
(defn gofmt-headers [headers]
(reduce-kv (fn [a k v] (conj a { k (into [] (flatten [v])) })) {} headers))
(defn result-cloudevent [ctx]
(merge (:cloudevent (:request ctx)) {
:contentType (get (:result ctx) :content_type "application/json")
:data (get (:result ctx) :body {})
:extensions {
:protocol {
:status_code (get (:result ctx) :status 200)
:headers (gofmt-headers (-> (ctx :result) :headers))
}}}))
(defn result-json [ctx]
(let [body (:body (:result ctx))
content-type (:content_type (:result ctx))]
{
:body (if (or (nil? content-type) (= content-type "application/json")) (generate-string body {:escape-non-ascii true}) body)
:content_type (get (:result ctx) :content_type "application/json")
:protocol {
:status_code (get (:result ctx) :status 200)
:headers (gofmt-headers (-> (ctx :result) :headers))
}
}))
(defn handle-result [ctx]
(if (= (:fmt env) "cloudevent") (result-cloudevent ctx) (result-json ctx)))
;
;
; See https://github.com/cloudevents/spec/blob/master/json-format.md
;
(defn format-cloudevent [req]
{
:call_id (-> req :eventID)
:content_type (get req :contentType "application/cloudevents+json")
:cloudevent req
:deadline (:deadline (req :extensions))
:headers (get (-> (req :extensions) :protocol) :headers {})
:request_url (get (-> (req :extensions) :protocol) :request_url (str "http://localhost:8080/r/" (:app env) "/" (:path env)))
})
(defn format-json [req]
{
:call_id (-> req :call_id)
:content_type (get req :content_type "application/json")
:deadline (:deadline req)
:headers (get (-> req :protocol) :headers {})
:request_url (get (-> req :protocol) :request_url (str "http://localhost:8080/r/" (:app env) "/" (:path env)))
})
(defn timeout [fx req]
(future-cancel fx)
{ :result {
:status 408
} :request req })
(defn handle-request [req fn-entrypoint]
(let [ctx (merge {
:app_name (:app env)
:app_route (:path env)
:config (:config env)
:fn_format (:fmt env)
:execution_type (:type env)
:arguments {}
}
(cond (= (:fmt env) "json") (format-json req)
(= (:fmt env) "cloudevent") (format-cloudevent req)
:else (throw (AssertionError. "'json' and 'cloudevent' are the only supported formats"))))
ms (t/in-millis (t/interval (t/now) (f/parse (:deadline ctx))))
fx (future (try (fn-entrypoint ctx (:data req)) (catch Exception e :exception)))
res (deref fx ms :timeout)]
(if (= res :exception) { :result { :status 500 } :request req } { :result (if (= res :timeout) (timeout fx req) res) :request req })))