Skip to content

Commit

Permalink
Namespaces (#124)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevgo committed Oct 3, 2020
1 parent 38fb703 commit a86e946
Show file tree
Hide file tree
Showing 14 changed files with 257 additions and 224 deletions.
156 changes: 152 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,152 @@
export { ObservableProcess } from "./observable-process"
export { Result } from "./result"
export { SearchableStream } from "./searchable-stream"
export { start } from "./start"
import * as childProcess from "child_process"
import stringArgv from "string-argv"
import mergeStream = require("merge-stream")
import * as util from "util"

import * as scanner from "./scanner"
const delay = util.promisify(setTimeout)

export * from "./scanner"

/** a long-running process whose behavior can be observed at runtime */
export class Process {
/** the underlying ChildProcess instance */
private childProcess: childProcess.ChildProcess

/** populated when the process finishes */
private result: Result | undefined

/** the STDIN stream of the underlying ChildProcess */
stdin: NodeJS.WritableStream

/** searchable STDOUT stream of the underlying ChildProcess */
stdout: scanner.Stream

/** searchable STDERR stream of the underlying ChildProcess */
stderr: scanner.Stream

/** searchable combined STDOUT and STDERR stream */
output: scanner.Stream

/** functions to call when this process ends */
private endedCallbacks: Array<(result: Result) => void>

constructor(args: { cwd: string; env: NodeJS.ProcessEnv; params: string[]; runnable: string }) {
this.endedCallbacks = []
this.childProcess = childProcess.spawn(args.runnable, args.params, {
cwd: args.cwd,
env: args.env,
})
this.childProcess.on("close", this.onClose.bind(this))
if (this.childProcess.stdin == null) {
throw new Error("process.stdin should not be null") // this exists only to make the typechecker shut up
}
this.stdin = this.childProcess.stdin
if (this.childProcess.stdout == null) {
throw new Error("process.stdout should not be null") // NOTE: this exists only to make the typechecker shut up
}
this.stdout = scanner.wrapStream(this.childProcess.stdout)
if (this.childProcess.stderr == null) {
throw new Error("process.stderr should not be null") // NOTE: this exists only to make the typechecker shut up
}
this.stderr = scanner.wrapStream(this.childProcess.stderr)
const outputStream = mergeStream(this.childProcess.stdout, this.childProcess.stderr)
this.output = scanner.wrapStream(outputStream)
}

/** stops the currently running process */
async kill(): Promise<Result> {
this.result = {
exitCode: -1,
killed: true,
stdText: this.stdout.fullText(),
errText: this.stderr.fullText(),
combinedText: this.output.fullText(),
}
this.childProcess.kill()
await delay(1)
return this.result
}

/** returns the process ID of the underlying ChildProcess */
pid(): number {
return this.childProcess.pid
}

/** returns a promise that resolves when the underlying ChildProcess terminates */
async waitForEnd(): Promise<Result> {
if (this.result) {
return this.result
}
return new Promise((resolve) => {
this.endedCallbacks.push(resolve)
})
}

/** called when the underlying ChildProcess terminates */
private onClose(exitCode: number) {
this.result = {
exitCode,
killed: false,
stdText: this.stdout.fullText(),
errText: this.stderr.fullText(),
combinedText: this.output.fullText(),
}
for (const endedCallback of this.endedCallbacks) {
endedCallback(this.result)
}
}
}

/** the result of running a process */
export interface Result {
/** combined output from STDOUT and STDERR */
combinedText: string

/** full output on the STDERR stream */
errText: string

/** the code with which the process has ended */
exitCode: number

/** whether the process was manually terminated by the user */
killed: boolean

/** full output on the STDOUT stream */
stdText: string
}

/** options for start */
export interface StartOptions {
/** the directory to run the process in */
cwd?: string

/** environment variables for the process */
env?: NodeJS.ProcessEnv
}

/** starts a new ObservableProcess with the given options */
export function start(command: string | string[], options: StartOptions = {}): Process {
// determine args
if (!command) {
throw new Error("start: no command to execute given")
}
let argv: string[] = []
// TODO: instanceOfString
if (typeof command === "string") {
argv = stringArgv(command)
} else if (Array.isArray(command)) {
argv = command
} else {
throw new Error("start: you must provide the command to run as a string or string[]")
}
const [runnable, ...params] = argv

// start the process
return new Process({
cwd: options.cwd || process.cwd(),
env: options.env || process.env,
params,
runnable,
})
}
97 changes: 0 additions & 97 deletions src/observable-process.ts

This file was deleted.

17 changes: 0 additions & 17 deletions src/result.ts

This file was deleted.

15 changes: 5 additions & 10 deletions src/searchable-stream.ts → src/scanner.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
import { TextStreamSearch } from "text-stream-search"

/**
* the API we use to search streams for text or regular expressions
*/
/** the API to search streams for text or regular expressions */
interface TextStreamSearcher {
fullText(): string
waitForRegex(regex: RegExp, timeout?: number): Promise<string>
waitForText(text: string, timeout?: number): Promise<string>
}

/**
* A NodeJS.ReadableStream decorated with additional capabilities
* to search the stream content.
*/
export type SearchableStream = NodeJS.ReadableStream & TextStreamSearcher
/** A NodeJS.ReadableStream that can search the stream content. */
export type Stream = NodeJS.ReadableStream & TextStreamSearcher

export function createSearchableStream(stream: NodeJS.ReadableStream): SearchableStream {
const result = stream as SearchableStream
export function wrapStream(stream: NodeJS.ReadableStream): Stream {
const result = stream as Stream
const search = new TextStreamSearch(stream)
result.waitForText = async function (text: string, timeout?: number) {
return search.waitForText(text, timeout)
Expand Down
34 changes: 0 additions & 34 deletions src/start.ts

This file was deleted.

6 changes: 0 additions & 6 deletions test/helpers/start-node-process.ts

This file was deleted.

10 changes: 6 additions & 4 deletions test/input-test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import { strict as assert } from "assert"

import { startNodeProcess } from "./helpers/start-node-process"
import * as observableProcess from "../src/index"

test("ObservableProcess.stdin", async function () {
// start a process that reads from STDIN
const running = startNodeProcess(
const running = observableProcess.start([
"node",
"-e",
`process.stdin.on("data", data => { process.stdout.write(data) });\
process.stdin.on("end", () => { process.stdout.write("\\nEND") })`
)
process.stdin.on("end", () => { process.stdout.write("\\nEND") })`,
])

// write some stuff into the STDIN stream of this process
running.stdin.write("hello")
Expand Down
14 changes: 8 additions & 6 deletions test/kill-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,25 @@ import { strict as assert } from "assert"
import got from "got"
import * as portFinder from "portfinder"

import { startNodeProcess } from "./helpers/start-node-process"
import * as observableProcess from "../src/index"

test("ObservableProcess.kill()", async function () {
this.timeout(8000)

// start a long-running process
const port = await portFinder.getPortPromise()
const longRunningProcess = startNodeProcess(
const process = observableProcess.start([
"node",
"-e",
`http = require('http');\
http.createServer(function(_, res) { res.end('hello') }).listen(${port}, 'localhost');\
console.log('online')`
)
await longRunningProcess.stdout.waitForText("online")
console.log('online')`,
])
await process.stdout.waitForText("online")
await assertIsRunning(port)

// kill the process
const result = await longRunningProcess.kill()
const result = await process.kill()

// verify the process is no longer running
await assertIsNotRunning(port)
Expand Down
Loading

0 comments on commit a86e946

Please sign in to comment.