Skip to content

Commit

Permalink
fix: kubectl watcher should auto-terminate, if given a bound up front
Browse files Browse the repository at this point in the history
Fixes #6417
  • Loading branch information
starpit committed Dec 21, 2020
1 parent 38a112a commit 3e52272
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 10 deletions.
51 changes: 49 additions & 2 deletions plugins/plugin-kubectl/src/controller/kubectl/watch/get-watch.ts
Expand Up @@ -18,6 +18,7 @@ import Debug from 'debug'
import prettyPrintDuration from 'pretty-ms'

import {
Row,
Table,
Arguments,
CodedError,
Expand Down Expand Up @@ -51,6 +52,8 @@ import {
formatTable
} from '../../../lib/view/formatTable'

import TrafficLight from '../../../lib/model/traffic-light'

const strings = i18n('plugin-kubectl', 'events')
const debug = Debug('plugin-kubectl/controller/watch/watcher')

Expand Down Expand Up @@ -125,6 +128,10 @@ export class EventWatcher implements Abortable, Watcher {
) {}

public abort() {
// A condition variable to guard for the case we were asked to
// abort very early on, before the PTY has even called us back
// ("abort-before-init"). See `onPTYInitDone` for the matching
// check on this variable:
this.shouldAbort = true

if (this.ptyJob) {
Expand Down Expand Up @@ -243,15 +250,32 @@ class KubectlWatcher implements Abortable, Watcher {
/** the table push API */
private pusher: WatchPusher

/**
* We may have been given a "limit" argument, which tells us how
* many Completed/Succeeded/Done rows to expect. We will
* auto-terminate the push notification channel upon reaching this
* limit.
*
*/
private readonly limit: number

/**
* @param output This is the output format that the user desired. Below, we
* formulate a watch query to the apiserver with a different
* schema. We will need sufficient discriminants to index a row
* update into an existing table. We cannot be certain that the
* schema the *user* requested satisfies this requirement.
*/
// eslint-disable-next-line no-useless-constructor
public constructor(private readonly args: Arguments<KubeOptions>, private readonly output = formatOf(args)) {}
public constructor(private readonly args: Arguments<KubeOptions>, private readonly output = formatOf(args)) {
this.limit =
args.parsedOptions.limit ||
(args.execOptions.data &&
typeof args.execOptions.data === 'object' &&
!Buffer.isBuffer(args.execOptions.data) &&
typeof args.execOptions.data.limit === 'number'
? args.execOptions.data.limit
: undefined)
}

/**
* Our impl of `Abortable` for use by the table view
Expand Down Expand Up @@ -366,6 +390,12 @@ class KubectlWatcher implements Abortable, Watcher {
}
}

/** Does this row signify a completed state? */
private isDone(row: Row) {
const statusAttr = row.attributes.find(_ => /STATUS/i.test(_.key))
return statusAttr && statusAttr.css && statusAttr.css === TrafficLight.Blue
}

/**
* Our impl of the `onInit` streaming PTY API: the PTY calls us with
* the PTY job (so that we can abort it, if we want). In return, we
Expand All @@ -378,6 +408,10 @@ class KubectlWatcher implements Abortable, Watcher {
debug('onPTYInit')
this.ptyJob.push(ptyJob)

// These help us with managing the countdown latch. See the comments for this.limit.
let remaining = this.limit
const markedAsDone: Record<string, boolean> = {}

return async (_: Streamable) => {
if (typeof _ === 'string') {
// <-- strings flowing out of the PTY
Expand Down Expand Up @@ -426,11 +460,24 @@ class KubectlWatcher implements Abortable, Watcher {
this.pusher.offline(row.name)
} else {
this.pusher.update(row, true, !isEvent)

const nameAttr = row.attributes.find(_ => /NAME/i.test(_.key))
const name = nameAttr ? nameAttr.value : row.name
if (this.limit && !markedAsDone[name] && this.isDone(row)) {
// we were asked to look for this.limit completions; we just saw one, so count down the latch
markedAsDone[name] = true
remaining--
}
}
})

// batch update done!
this.pusher.batchUpdateDone()

if (this.limit && remaining <= 0) {
debug('Aborting PTY channel, due to having observed the expected number of completions')
this.abort()
}
}
} else {
console.error('unknown streamable type', _)
Expand Down
10 changes: 6 additions & 4 deletions plugins/plugin-kubectl/src/lib/view/formatTable.ts
Expand Up @@ -329,19 +329,21 @@ export const formatTable = async <O extends KubeOptions>(
key,
tag: idx > 0 && tagForKey[key],
onclick: colIdx + 1 === nameColumnIdx && onclick, // see the onclick comment: above ^^^; +1 because of slice(1)
outerCSS:
outerCSS: (
header +
' ' +
(outerCSSForKey[key] || '') +
(colIdx <= 1 || colIdx === nameColumnIdx - 1 || columnVisibleWithSidecar.test(key)
? ''
: ' hide-with-sidecar'), // nameColumnIndex - 1 beacuse of rows.slice(1)
css:
: ' hide-with-sidecar')
).trim(), // nameColumnIndex - 1 beacuse of rows.slice(1)
css: (
css +
' ' +
((idx > 0 && cssForKey[key]) || '') +
' ' +
(cssForValue[column] || (key === 'READY' && cssForReadyCount(column)) || maybeRed(column)),
(cssForValue[column] || (key === 'READY' && cssForReadyCount(column)) || maybeRed(column))
).trim(),
value: key === 'STATUS' && idx > 0 ? column || strings('Unknown') : column
})
)
Expand Down
6 changes: 3 additions & 3 deletions plugins/plugin-s3/src/jobs/providers/CodeEngine2.ts
Expand Up @@ -58,8 +58,8 @@ export default class CodeEngine /* implements JobProvider<JobName> */ {
}

/** Show the progress of the job */
public async show(jobName: JobName) {
return this.repl.qexec<Table>(`ibmcloud ce jobrun list ${jobName} --watch`)
public async show(jobName: JobName, nTasks: number) {
return this.repl.qexec<Table>(`ibmcloud ce jobrun list ${jobName} --watch --limit ${nTasks}`)
}

/** -e key=value */
Expand Down Expand Up @@ -87,7 +87,7 @@ export default class CodeEngine /* implements JobProvider<JobName> */ {

const memOpts = params.m || params.memory ? `--memory ${params.m || params.memory}` : ''

const cpuOpts = `--cpu ${params.cpu || '8.0'}`
const cpuOpts = params.cpu ? `--cpu ${params.cpu}` : ''
debug('resource options', memOpts, cpuOpts, params)

const jobrunName = `kui-jobrun-${v4()}`
Expand Down
2 changes: 1 addition & 1 deletion plugins/plugin-s3/src/ssc/scaleOut.ts
Expand Up @@ -32,7 +32,7 @@ class Job {
) {}

public show() {
return this.runner.show(this.jobName)
return this.runner.show(this.jobName, this.nTasks)
}

public wait() {
Expand Down
6 changes: 6 additions & 0 deletions plugins/plugin-s3/src/vfs/index.ts
Expand Up @@ -650,6 +650,9 @@ class S3VFSResponder extends S3VFS implements VFS {
if (!parsedOptions.memory) {
parsedOptions.memory = '1024Mi'
}
if (!parsedOptions.cpu) {
parsedOptions.cpu = 8
}

debug('scale-out gzip sources', srcs, parsedOptions)
return runWithProgress(
Expand All @@ -669,6 +672,9 @@ class S3VFSResponder extends S3VFS implements VFS {
if (!parsedOptions.memory) {
parsedOptions.memory = '1024Mi'
}
if (!parsedOptions.cpu) {
parsedOptions.cpu = 8
}

debug('scale-out gunzip sources', srcs, parsedOptions)
return runWithProgress(
Expand Down

0 comments on commit 3e52272

Please sign in to comment.