/
action_request.ts
412 lines (378 loc) · 13.6 KB
/
action_request.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
import * as express from "express"
import * as oboe from "oboe"
import * as httpRequest from "request"
import * as sanitizeFilename from "sanitize-filename"
import * as semver from "semver"
import { PassThrough, Readable } from "stream"
import * as winston from "winston"
import { truncateString } from "./utils"
import {
DataWebhookPayload,
DataWebhookPayloadType as ActionType,
} from "../api_types/data_webhook_payload"
import { DataWebhookPayloadScheduledPlanType } from "../api_types/data_webhook_payload_scheduled_plan"
import {
IntegrationSupportedDownloadSettings as ActionDownloadSettings,
IntegrationSupportedFormats as ActionFormat,
IntegrationSupportedFormattings as ActionFormatting,
IntegrationSupportedVisualizationFormattings as ActionVisualizationFormatting,
} from "../api_types/integration"
import { Query } from "../api_types/query"
import { Fieldset } from "./index"
import { Row as JsonDetailRow } from "./json_detail"
export {
ActionType,
ActionFormat,
ActionFormatting,
ActionVisualizationFormatting,
ActionDownloadSettings,
}
export interface ParamMap {
[name: string]: string | undefined
}
export interface ActionAttachment {
dataBuffer?: Buffer
encoding?: string
dataJSON?: any
mime?: string
fileExtension?: string
}
export interface ActionScheduledPlan {
/** ID of the scheduled plan */
scheduledPlanId?: number | null
/** Title of the scheduled plan. */
title?: string | null
/** Type of content of the scheduled plan. Valid values are: "Look", "Dashboard". */
type?: DataWebhookPayloadScheduledPlanType
/** URL of the content item in Looker. */
url?: string | null
/** ID of the query that the data payload represents. */
queryId?: number | null
/** Query that was run (not available for dashboards) */
query?: Query | null
/** A boolean representing whether this schedule payload has customized the filter values. */
filtersDifferFromLook?: boolean
/** A string to be included in scheduled integrations if this scheduled plan is a download query */
downloadUrl?: string | null
}
export class ActionRequest {
static fromRequest(request: express.Request) {
const actionRequest = this.fromJSON(request.body)
actionRequest.instanceId = request.header("x-looker-instance")
actionRequest.webhookId = request.header("x-looker-webhook-id")
const userAgent = request.header("user-agent")
if (userAgent) {
const version = userAgent.split("LookerOutgoingWebhook/")[1]
actionRequest.lookerVersion = semver.valid(version)
}
return actionRequest
}
// Used to turn json back into an actionRequest
static fromIPC(json: any) {
const actionRequest = new ActionRequest()
Object.assign(actionRequest, json)
if (actionRequest.attachment && actionRequest.attachment.dataBuffer) {
actionRequest.attachment.dataBuffer = Buffer.from(json.attachment.dataBuffer)
}
return actionRequest
}
static fromJSON(json?: DataWebhookPayload) {
if (!json) {
throw "Request body must be valid JSON."
}
const request = new ActionRequest()
if (json.type === null) {
throw `Action did not specify a "type".`
} else {
request.type = json.type
}
if (json.attachment) {
request.attachment = {}
request.attachment.mime = json.attachment.mimetype!
request.attachment.fileExtension = json.attachment.extension!
if (request.attachment.mime && json.attachment.data) {
if (json.attachment.data) {
request.attachment.encoding = request.attachment.mime.endsWith(";base64") ? "base64" : "utf8"
request.attachment.dataBuffer = Buffer.from(json.attachment.data, request.attachment.encoding)
if (request.attachment.mime === "application/json") {
request.attachment.dataJSON = JSON.parse(json.attachment.data)
}
}
}
}
if (json.scheduled_plan) {
request.scheduledPlan = {
filtersDifferFromLook: json.scheduled_plan.filters_differ_from_look,
queryId: json.scheduled_plan.query_id,
query: json.scheduled_plan.query,
scheduledPlanId: json.scheduled_plan.scheduled_plan_id,
title: json.scheduled_plan.title,
type: json.scheduled_plan.type,
url: json.scheduled_plan.url,
downloadUrl: json.scheduled_plan.download_url,
}
}
if (json.data) {
request.params = json.data
}
if (json.form_params) {
request.formParams = json.form_params
}
return request
}
attachment?: ActionAttachment
formParams: ParamMap = {}
params: ParamMap = {}
scheduledPlan?: ActionScheduledPlan
type!: ActionType
actionId?: string
instanceId?: string
webhookId?: string
lookerVersion: string | null = null
/** `stream` creates and manages a stream of the request data
*
* ```ts
* let prom = await request.stream(async (readable) => {
* return myService.uploadStreaming(readable).promise()
* })
* ```
*
* Streaming generally occurs only if Looker sends the data in a streaming fashion via a push url,
* however it will also wrap non-streaming attachment data so that actions only need a single implementation.
*
* @returns A promise returning the same value as the callback's return value.
* This promise will resolve after the stream has completed and the callback's promise
* has also resolved.
* @param callback A function will be caled with a Node.js `Readable` object.
* The readable object represents the streaming data.
*/
async stream<T>(callback: (readable: Readable) => Promise<T>): Promise<T> {
const stream = new PassThrough()
const returnPromise = callback(stream)
const timeout = process.env.ACTION_HUB_STREAM_REQUEST_TIMEOUT ?
parseInt(process.env.ACTION_HUB_STREAM_REQUEST_TIMEOUT, 10)
:
13 * 60 * 1000
const url = this.scheduledPlan && this.scheduledPlan.downloadUrl
const streamPromise = new Promise<void>((resolve, reject) => {
if (url) {
winston.info(`[stream] beginning stream via download url`, this.logInfo)
let hasResolved = false
httpRequest
.get(url, {timeout})
.on("error", (err) => {
if (hasResolved && (err as any).code === "ECONNRESET") {
winston.info(`[stream] ignoring ECONNRESET that occured after streaming finished`, this.logInfo)
} else {
winston.error(`[stream] request stream error`, {
...this.logInfo,
error: err.message,
stack: err.stack,
})
reject(err)
}
})
.on("finish", () => {
winston.info(`[stream] streaming via download url finished`, this.logInfo)
})
.on("socket", (socket) => {
winston.info(`[stream] setting keepalive on socket`, this.logInfo)
socket.setKeepAlive(true)
})
.on("abort", () => {
winston.info(`[stream] streaming via download url aborted`, this.logInfo)
})
.on("response", () => {
winston.info(`[stream] got response from download url`, this.logInfo)
})
.on("close", () => {
winston.info(`[stream] request stream closed`, this.logInfo)
})
.pipe(stream)
.on("error", (err) => {
winston.error(`[stream] PassThrough stream error`, {
...this.logInfo,
error: err,
stack: err.stack,
})
reject(err)
})
.on("finish", () => {
winston.info(`[stream] PassThrough stream finished`, this.logInfo)
resolve()
hasResolved = true
})
.on("close", () => {
winston.info(`[stream] PassThrough stream closed`, this.logInfo)
})
} else {
if (this.attachment && this.attachment.dataBuffer) {
winston.info(`Using "fake" streaming because request contained attachment data.`, this.logInfo)
stream.end(this.attachment.dataBuffer)
resolve()
} else {
reject(
"startStream was called on an ActionRequest that does not have" +
"a streaming download url or an attachment. Ensure usesStreaming is set properly on the action.")
}
}
})
const results = await Promise.all([returnPromise, streamPromise])
return results[0]
}
/**
* A streaming helper for the "json" data format. It handles automatically parsing
* the JSON in a streaming fashion. You just need to implement a function that will
* be called for each row.
*
* ```ts
* await request.streamJson((row) => {
* // This will be called for each row of data
* })
* ```
*
* @returns A promise that will be resolved when streaming is complete.
* @param onRow A function that will be called for each streamed row, with the row as the first argument.
*/
async streamJson(onRow: (row: { [fieldName: string]: any }) => void) {
return new Promise<void>((resolve, reject) => {
let rows = 0
this.stream(async (readable) => {
oboe(readable)
.node("![*]", this.safeOboe(readable, reject, (row) => {
rows++
onRow(row)
}))
.done(() => {
winston.info(`[streamJson] oboe reports done`, {...this.logInfo, rows})
})
}).then(() => {
winston.info(`[streamJson] complete`, {...this.logInfo, rows})
resolve()
}).catch((error) => {
// This error should not be logged as it could come from an action
// which might decide to include user information in the error message
winston.info(`[streamJson] reported an error`, {...this.logInfo, rows})
reject(error)
})
})
}
/**
* A streaming helper for the "json_detail" data format. It handles automatically parsing
* the JSON in a streaming fashion. You can implement an `onFields` callback to get
* the field metadata, and an `onRow` callback for each row of data.
*
* ```ts
* await request.streamJsonDetail({
* onFields: (fields) => {
* // This will be called when fields are available
* },
* onRow: (row) => {
* // This will be called for each row of data
* },
* })
* ```
*
* @returns A promise that will be resolved when streaming is complete.
* @param callbacks An object consisting of several callbacks that will be called
* when various parts of the data are parsed.
*/
async streamJsonDetail(callbacks: {
onRow: (row: JsonDetailRow) => void,
onFields?: (fields: Fieldset) => void,
onRanAt?: (iso8601string: string) => void,
}) {
return new Promise<void>((resolve, reject) => {
let rows = 0
this.stream(async (readable) => {
oboe(readable)
.node("data.*", this.safeOboe(readable, reject, (row) => {
rows++
callbacks.onRow(row)
}))
.node("!.fields", this.safeOboe(readable, reject, (fields) => {
if (callbacks.onFields) {
callbacks.onFields(fields)
}
}))
.node("!.ran_at", this.safeOboe(readable, reject, (ranAt) => {
if (callbacks.onRanAt) {
callbacks.onRanAt(ranAt)
}
}))
.done(() => {
winston.info(`[streamJsonDetail] oboe reports done`, {...this.logInfo, rows})
})
}).then(() => {
winston.info(`[streamJsonDetail] complete`, {...this.logInfo, rows})
resolve()
}).catch((error) => {
// This error should not be logged as it could come from an action
// which might decide to include user information in the error message
winston.info(`[streamJsonDetail] reported an error`, {...this.logInfo, rows})
reject(error)
})
})
}
suggestedFilename() {
if (this.attachment) {
if (this.scheduledPlan && this.scheduledPlan.title) {
return sanitizeFilename(`${this.scheduledPlan.title}.${this.attachment.fileExtension}`)
} else {
return sanitizeFilename(`looker_file_${Date.now()}.${this.attachment.fileExtension}`)
}
}
}
/** creates a truncated message with a max number of lines and max number of characters with Title, Url,
* and truncated Body of payload
* @param {number} maxLines - maximum number of lines to truncate message
* @param {number} maxCharacters - maximum character to truncate
*/
suggestedTruncatedMessage(maxLines: number, maxCharacters: number) {
if (this.attachment && this.attachment.dataBuffer) {
let title = ""
let url = ""
if (this.scheduledPlan) {
if (this.scheduledPlan.title) {
title = `${this.scheduledPlan.title}:\n`
}
if (this.scheduledPlan.url) {
url = this.scheduledPlan.url
title = title + url + "\n"
}
}
const truncatedLines = this.attachment.dataBuffer
.toString("utf8")
.split("\n")
.slice(0, maxLines)
if (truncatedLines.length === maxLines) {
truncatedLines.push("")
}
const newMessage = truncatedLines.join("\n")
let body = title + newMessage
body = truncateString(body, maxCharacters)
return body
}
}
private get logInfo() {
return {webhookId: this.webhookId}
}
private safeOboe(
stream: Readable,
reject: (reason?: any) => void,
callback: (node: any) => void,
) {
const logInfo = this.logInfo
return function(this: oboe.Oboe, node: any) {
try {
callback(node)
return oboe.drop
} catch (e) {
winston.info(`safeOboe callback produced an error, aborting stream`, logInfo)
this.abort()
stream.destroy()
reject(e)
}
}
}
}