/
index.go
148 lines (125 loc) · 3.38 KB
/
index.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package api
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"path"
"path/filepath"
"strconv"
"strings"
"github.com/ncarlier/webhookd/pkg/config"
"github.com/ncarlier/webhookd/pkg/logger"
"github.com/ncarlier/webhookd/pkg/model"
"github.com/ncarlier/webhookd/pkg/worker"
)
var (
defaultTimeout int
scriptDir string
outputDir string
)
func atoiFallback(str string, fallback int) int {
if value, err := strconv.Atoi(str); err == nil && value > 0 {
return value
}
return fallback
}
// index is the main handler of the API.
func index(conf *config.Config) http.Handler {
defaultTimeout = conf.HookTimeout
scriptDir = conf.ScriptDir
outputDir = conf.HookLogDir
return http.HandlerFunc(webhookHandler)
}
func webhookHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
if _, err := strconv.Atoi(filepath.Base(r.URL.Path)); err == nil {
getWebhookLog(w, r)
return
}
}
triggerWebhook(w, r)
}
func triggerWebhook(w http.ResponseWriter, r *http.Request) {
// Check that streaming is supported
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported!", http.StatusInternalServerError)
return
}
// Get script location
p := strings.TrimPrefix(r.URL.Path, "/")
if p == "" {
infoHandler(w, r)
return
}
script, err := worker.ResolveScript(scriptDir, p)
if err != nil {
logger.Error.Println(err.Error())
http.Error(w, "hook not found", http.StatusNotFound)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
logger.Error.Printf("error reading body: %v", err)
http.Error(w, "can't read body", http.StatusBadRequest)
return
}
params := QueryParamsToShellVars(r.URL.Query())
params = append(params, HTTPHeadersToShellVars(r.Header)...)
// logger.Debug.Printf("API REQUEST: \"%s\" with params %s...\n", p, params)
// Create work
timeout := atoiFallback(r.Header.Get("X-Hook-Timeout"), defaultTimeout)
work := model.NewWorkRequest(p, script, string(body), outputDir, params, timeout)
// Put work in queue
worker.WorkQueue <- *work
if r.Method == "GET" {
// Send SSE response
w.Header().Set("Content-Type", "text/event-stream")
} else {
// Send chunked response
w.Header().Set("X-Content-Type-Options", "nosniff")
}
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Hook-ID", strconv.FormatUint(work.ID, 10))
for {
msg, open := <-work.MessageChan
if !open {
break
}
if r.Method == "GET" {
fmt.Fprintf(w, "data: %s\n\n", msg) // Send SSE response
} else {
fmt.Fprintf(w, "%s\n", msg) // Send chunked response
}
// Flush the data immediately instead of buffering it for later.
flusher.Flush()
}
}
func getWebhookLog(w http.ResponseWriter, r *http.Request) {
// Get hook ID
id := path.Base(r.URL.Path)
// Get script location
name := path.Dir(strings.TrimPrefix(r.URL.Path, "/"))
_, err := worker.ResolveScript(scriptDir, name)
if err != nil {
logger.Error.Println(err.Error())
http.Error(w, err.Error(), http.StatusNotFound)
return
}
// Retrieve log file
logFile, err := worker.RetrieveLogFile(id, name, outputDir)
if err != nil {
logger.Error.Println(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if logFile == nil {
http.Error(w, "job not found", http.StatusNotFound)
return
}
defer logFile.Close()
w.Header().Set("Content-Type", "text/plain")
io.Copy(w, logFile)
}