diff --git a/.changeset/bright-vans-cross.md b/.changeset/bright-vans-cross.md new file mode 100644 index 00000000..ea1083dc --- /dev/null +++ b/.changeset/bright-vans-cross.md @@ -0,0 +1,6 @@ +--- +'@transloadit/notify-url-relay': minor +--- + +Add a new `@transloadit/notify-url-relay` package for running a local Transloadit +`notify_url` relay with fetch-based forwarding, assembly polling, retry logic, and a CLI/TUI. diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f88a874..e9ad9e8e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -164,6 +164,8 @@ jobs: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository runs-on: ubuntu-latest + env: + NODE_OPTIONS: --trace-deprecation --trace-warnings steps: - uses: actions/checkout@v4 - uses: actions/setup-node@v4 @@ -181,18 +183,20 @@ jobs: - run: corepack yarn test env: - TRANSLOADIT_KEY: ${{ secrets.TRANSLOADIT_KEY }} - TRANSLOADIT_SECRET: ${{ secrets.TRANSLOADIT_SECRET }} - NODE_OPTIONS: --trace-deprecation --trace-warnings CLOUDFLARED_PATH: ${{ github.workspace }}/cloudflared-linux-amd64 DEBUG: 'transloadit:*' + TRANSLOADIT_KEY: ${{ secrets.TRANSLOADIT_KEY }} + TRANSLOADIT_SECRET: ${{ secrets.TRANSLOADIT_SECRET }} - name: Run MCP server e2e tests run: corepack yarn workspace @transloadit/mcp-server test:e2e + + - name: Run notify-url-relay real e2e test + run: corepack yarn workspace @transloadit/notify-url-relay test:real env: TRANSLOADIT_KEY: ${{ secrets.TRANSLOADIT_KEY }} TRANSLOADIT_SECRET: ${{ secrets.TRANSLOADIT_SECRET }} - NODE_OPTIONS: --trace-deprecation --trace-warnings + TRANSLOADIT_ENDPOINT: ${{ secrets.TRANSLOADIT_ENDPOINT }} - name: Generate the badge from the json-summary run: node --experimental-strip-types packages/node/test/generate-coverage-badge.ts packages/node/coverage/coverage-summary.json diff --git a/docs/fingerprint/transloadit-baseline.json b/docs/fingerprint/transloadit-baseline.json index 594ec9b6..19f2fb2f 100644 --- a/docs/fingerprint/transloadit-baseline.json +++ b/docs/fingerprint/transloadit-baseline.json @@ -1,13 +1,13 @@ { "packageDir": "/home/kvz/code/node-sdk/packages/transloadit", "tarball": { - "filename": "transloadit-4.7.4.tgz", - "sizeBytes": 1247606, - "sha256": "48a0c23cb4652d91594b7ff5a7e16af1397e8da663245f241de49f1057c4330e" + "filename": "transloadit-4.7.5.tgz", + "sizeBytes": 1247607, + "sha256": "8ece4cbf0df04422d189035abb22a80c0565f95e5b10488d0e9226277702e363" }, "packageJson": { "name": "transloadit", - "version": "4.7.4", + "version": "4.7.5", "main": "./dist/Transloadit.js", "exports": { ".": "./dist/Transloadit.js", @@ -473,8 +473,8 @@ }, { "path": "dist/robots.js", - "sizeBytes": 8373, - "sha256": "de7fd519fcf36cbae2d1a3a316dadf331a61e933bcb666a92358502fb3e90433" + "sizeBytes": 8374, + "sha256": "740d92236f9b0319b73f38c9827074747b80f36694906df916039bec8cd421d2" }, { "path": "dist/alphalib/types/robots/s3-import.js", @@ -689,7 +689,7 @@ { "path": "package.json", "sizeBytes": 2730, - "sha256": "0cf46ccf180fd122bfa412623700b21b34b3cd962aad90102ca8ab14256de1c6" + "sha256": "313dd2ac13d3e4857b71bd889b2c9fa7f2458cf2bf5be2dd5a1996eb3d23199d" }, { "path": "dist/alphalib/types/robots/_index.d.ts.map", @@ -1599,7 +1599,7 @@ { "path": "dist/robots.js.map", "sizeBytes": 9412, - "sha256": "79c6c465ced5d07c7d7a3b5124c4486584d43aa86468caafe886f3b291304b13" + "sha256": "72d51dbe2ba7017ec5700bd72f5247142fa2f4cc3cda60c822a92fef0becf841" }, { "path": "dist/alphalib/types/robots/s3-import.d.ts.map", @@ -2938,8 +2938,8 @@ }, { "path": "src/robots.ts", - "sizeBytes": 9642, - "sha256": "fa774b4f6f8b30eb18a94bf2a117bae924d0b80f05e9bb16d7e85a8298e19543" + "sizeBytes": 9643, + "sha256": "7afa1b895f1b28c6eb63b496450e3236e5e20fc649484ba878fd0f6179899499" }, { "path": "dist/alphalib/types/robots/s3-import.d.ts", diff --git a/docs/fingerprint/transloadit-baseline.package.json b/docs/fingerprint/transloadit-baseline.package.json index 63814af2..b1621636 100644 --- a/docs/fingerprint/transloadit-baseline.package.json +++ b/docs/fingerprint/transloadit-baseline.package.json @@ -1,6 +1,6 @@ { "name": "transloadit", - "version": "4.7.4", + "version": "4.7.5", "description": "Node.js SDK for Transloadit", "homepage": "https://github.com/transloadit/node-sdk/tree/main/packages/node", "bugs": { diff --git a/docs/prompts/2026-03-04-notify-url-relay-refactors.md b/docs/prompts/2026-03-04-notify-url-relay-refactors.md new file mode 100644 index 00000000..a4e20050 --- /dev/null +++ b/docs/prompts/2026-03-04-notify-url-relay-refactors.md @@ -0,0 +1,17 @@ +# Notify URL Relay Refactor Follow-ups (2026-03-04) + +Tracking refactors requested after council ideas. Items are completed in order and checked off as they land. + +- [x] 1. Consolidate repeated integration test harness helpers into shared test utilities. +- [x] 2. Unify CLI option definition/validation/help generation with one declarative schema. +- [x] 3. Replace cross-package source import in real E2E with workspace package import, fixing Vitest resolution the right way. +- [x] 4. Remove redundant null guard in `observeTiming`. +- [x] 5. Simplify timeout signal plumbing using Node native Abort APIs while preserving timeout error semantics. +- [x] 6. Deduplicate repeated E2E workflow environment variables at job-level. + +## Council Refactor Run #2 (2026-03-04) + +- [x] 1. Stabilize and DRY test harness setup; remove `getFreePort()` TOCTOU race by binding proxy to port `0`. +- [x] 2. Deduplicate `observeTiming` emission path to a single payload construction. +- [x] 3. Use named export for the relay class (align with repo style guidance). +- [x] 4. Replace avoidable `as` casts with stronger typing/narrowing where practical. diff --git a/knip.ts b/knip.ts index eb344c49..85ee3de1 100644 --- a/knip.ts +++ b/knip.ts @@ -47,6 +47,12 @@ const config: KnipConfig = { ignore: [...sharedIgnore], ignoreDependencies: ['@types/express', '@types/node', 'vitest', 'vitest/config'], }, + 'packages/notify-url-relay': { + entry: ['src/**/*.ts', 'test/**/*.ts', 'vitest.config.ts'], + project: ['{src,test}/**/*.ts'], + ignore: [...sharedIgnore], + ignoreDependencies: ['@types/node', 'vitest', 'vitest/config'], + }, 'packages/transloadit': { entry: [ 'src/Transloadit.ts', diff --git a/package.json b/package.json index facd1407..332fb444 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "verify": "yarn lint:changesets && yarn lint:publish && yarn lint:deps && yarn lint:js && yarn lint:ts && yarn test:unit", "verify:full": "yarn verify && yarn knip && yarn parity:transloadit && yarn test:types", "lint:js": "biome check .", - "lint:ts": "yarn tsc:types && yarn tsc:node && yarn tsc:zod", + "lint:ts": "yarn tsc:types && yarn tsc:node && yarn tsc:zod && yarn workspace @transloadit/notify-url-relay lint:ts", "lint:changesets": "node scripts/guard-changesets.ts", "lint": "yarn lint:js", "fix": "yarn fix:js", @@ -23,7 +23,7 @@ "knip": "yarn run --binaries-only knip --exclude binaries --no-config-hints --no-progress", "pack": "node scripts/pack-transloadit.ts", "parity:transloadit": "node scripts/prepare-transloadit.ts && node scripts/fingerprint-pack.ts packages/transloadit --ignore-scripts --quiet --out /tmp/transloadit-after.json && node scripts/verify-fingerprint.ts --current /tmp/transloadit-after.json --diff", - "test:unit": "yarn workspace @transloadit/node test:unit && yarn workspace @transloadit/mcp-server test:unit && yarn workspace @transloadit/types test:unit && yarn workspace @transloadit/zod test:unit", + "test:unit": "yarn workspace @transloadit/node test:unit && yarn workspace @transloadit/mcp-server test:unit && yarn workspace @transloadit/types test:unit && yarn workspace @transloadit/zod test:unit && yarn workspace @transloadit/notify-url-relay test:unit", "test:types": "yarn workspace @transloadit/zod test:types", "test:e2e": "yarn workspace @transloadit/node test:e2e", "test": "yarn workspace @transloadit/node test", diff --git a/packages/notify-url-relay/.env.example b/packages/notify-url-relay/.env.example new file mode 100644 index 00000000..e7f3835e --- /dev/null +++ b/packages/notify-url-relay/.env.example @@ -0,0 +1,6 @@ +TRANSLOADIT_KEY=your-key +TRANSLOADIT_SECRET=your-secret +# Optional (defaults to https://api2.transloadit.com) +TRANSLOADIT_ENDPOINT=https://api2.transloadit.com +# Optional (0-8 or emerg/alert/crit/err/warn/notice/info/debug/trace) +TRANSLOADIT_LOG_LEVEL=info diff --git a/packages/notify-url-relay/CHANGELOG.md b/packages/notify-url-relay/CHANGELOG.md new file mode 100644 index 00000000..e9d90fec --- /dev/null +++ b/packages/notify-url-relay/CHANGELOG.md @@ -0,0 +1,7 @@ +# @transloadit/notify-url-relay + +## 0.1.0 + +### Minor Changes + +- Initial release of the notify_url proxy package. diff --git a/packages/notify-url-relay/README.md b/packages/notify-url-relay/README.md new file mode 100644 index 00000000..d0561267 --- /dev/null +++ b/packages/notify-url-relay/README.md @@ -0,0 +1,122 @@ +# @transloadit/notify-url-relay + +Local `notify_url` relay for Transloadit Assemblies. This tool polls the status of Assemblies until they complete, then pushes the status to a pingback URL of your choosing. This is useful while on a development machine, which is inaccessible from the public internet and hence can't be notified by Transloadit. + +You can alternatively use a tunnel like ngrok or [Cloudflare Tunnel](https://developers.cloudflare.com/cloudflare-one/networks/connectors/cloudflare-tunnel/) for this, this is just one more way. + +This version is modernized for: + +- Node.js 24+ +- Native TypeScript execution (type stripping) +- ESM +- Yarn 4 +- Biome + Vitest + GitHub Actions + Changesets + +Notify payloads are signed via `@transloadit/utils` using prefixed `sha384` signatures. +Forwarding uses native `fetch`, polling retries use `p-retry`, and logs are emitted via `@transloadit/sev-logger`. +Metrics hooks are available for counters, gauges, and timings. + +## Install + +```bash +npm install @transloadit/notify-url-relay +``` + +## CLI usage + +```bash +export TRANSLOADIT_SECRET="your-secret" + +notify-url-relay \ + --notifyUrl "http://127.0.0.1:3000/transloadit" \ + --port 8888 \ + --notifyOnTerminalError \ + --log-level info +``` + +Run `notify-url-relay --help` for all options. + +Log level accepts `0-8` or names: +`emerg`, `alert`, `crit`, `err`, `warn`, `notice`, `info`, `debug`, `trace`. +You can also set `TRANSLOADIT_LOG_LEVEL`. + +### Reactive TUI Mode + +```bash +notify-url-relay --ui --log-level info +``` + +This opens a live terminal dashboard with: + +- throughput and retry counters +- in-flight queue gauges +- latency sparklines +- streaming logs + +## Programmatic usage + +```ts +import { TransloaditNotifyUrlProxy } from '@transloadit/notify-url-relay' + +const proxy = new TransloaditNotifyUrlProxy( + process.env.TRANSLOADIT_SECRET ?? '', + 'http://127.0.0.1:3000/transloadit' +) + +proxy.run({ + port: 8888, + target: 'https://api2.transloadit.com', + forwardTimeoutMs: 15000, + pollIntervalMs: 2000, + pollMaxIntervalMs: 30000, + pollBackoffFactor: 2, + pollRequestTimeoutMs: 15000, + maxPollAttempts: 10, + maxInFlightPolls: 4, + notifyOnTerminalError: false, + notifyTimeoutMs: 15000, + notifyMaxAttempts: 3, + notifyIntervalMs: 500, + notifyMaxIntervalMs: 5000, + notifyBackoffFactor: 2 +}) +``` + +## Development + +```bash +corepack yarn +corepack yarn workspace @transloadit/notify-url-relay check +corepack yarn workspace @transloadit/notify-url-relay test:real +``` + +## Real API E2E + +Run an opt-in test against the real Transloadit API (default `yarn test` excludes this test): + +```bash +# set locally (for example in .env) +export TRANSLOADIT_KEY="your-key" +export TRANSLOADIT_SECRET="your-secret" +# optional +export TRANSLOADIT_ENDPOINT="https://api2.transloadit.com" + +corepack yarn workspace @transloadit/notify-url-relay test:real +``` + +For CI, configure repository secrets: + +- `TRANSLOADIT_KEY` +- `TRANSLOADIT_SECRET` +- `TRANSLOADIT_ENDPOINT` (optional) + +## Releases + +Changesets drives releases for this package: + +```bash +corepack yarn changeset +corepack yarn changeset:version +``` + +On pushes to `main`, `.github/workflows/release.yml` runs `changesets/action` to publish. diff --git a/packages/notify-url-relay/package.json b/packages/notify-url-relay/package.json new file mode 100644 index 00000000..3c3a3f5b --- /dev/null +++ b/packages/notify-url-relay/package.json @@ -0,0 +1,52 @@ +{ + "name": "@transloadit/notify-url-relay", + "version": "0.1.0", + "description": "Local notify_url relay for Transloadit assemblies.", + "type": "module", + "license": "MIT", + "packageManager": "yarn@4.12.0", + "engines": { + "node": ">= 24" + }, + "repository": { + "type": "git", + "url": "https://github.com/transloadit/node-sdk", + "directory": "packages/notify-url-relay" + }, + "files": [ + "dist", + "README.md" + ], + "main": "./dist/index.js", + "types": "./dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "default": "./dist/index.js" + }, + "./package.json": "./package.json" + }, + "bin": "./dist/cli.js", + "scripts": { + "lint:ts": "../../node_modules/.bin/tsc --build tsconfig.build.json", + "build": "yarn lint:ts && chmod +x dist/cli.js", + "test:unit": "yarn --cwd ../.. tsc:zod && ../../node_modules/.bin/vitest run", + "test:real": "yarn --cwd ../.. tsc:zod && RUN_REAL_E2E=1 ../../node_modules/.bin/vitest run test/real.e2e.test.ts", + "check": "yarn lint:ts && yarn test:unit", + "prepack": "yarn build" + }, + "dependencies": { + "@transloadit/sev-logger": "^0.1.9", + "@transloadit/utils": "^4.3.0", + "@transloadit/zod": "^4.3.0", + "dotenv": "^17.2.3", + "p-retry": "^7.1.1" + }, + "devDependencies": { + "@types/node": "^24.10.3", + "transloadit": "^4.7.4" + }, + "publishConfig": { + "tag": "experimental" + } +} diff --git a/packages/notify-url-relay/src/cli.ts b/packages/notify-url-relay/src/cli.ts new file mode 100755 index 00000000..d5e8b316 --- /dev/null +++ b/packages/notify-url-relay/src/cli.ts @@ -0,0 +1,631 @@ +#!/usr/bin/env node + +import { parseArgs } from 'node:util' +import { SevLogger } from '@transloadit/sev-logger' +import { config as loadDotEnv } from 'dotenv' +import type { + CounterMetricEvent, + GaugeMetricEvent, + ProxyLogEvent, + ProxyRuntimeOptions, + ProxySettings, + TimingMetricEvent, +} from './index.ts' +import { TransloaditNotifyUrlProxy } from './index.ts' + +loadDotEnv({ quiet: true }) + +const LOCAL_HOSTS = new Set(['localhost', '127.0.0.1', '::1']) + +const LOG_LEVEL_BY_NAME = { + emerg: SevLogger.LEVEL.EMERG, + alert: SevLogger.LEVEL.ALERT, + crit: SevLogger.LEVEL.CRIT, + err: SevLogger.LEVEL.ERR, + error: SevLogger.LEVEL.ERR, + warn: SevLogger.LEVEL.WARN, + warning: SevLogger.LEVEL.WARN, + notice: SevLogger.LEVEL.NOTICE, + info: SevLogger.LEVEL.INFO, + debug: SevLogger.LEVEL.DEBUG, + trace: SevLogger.LEVEL.TRACE, +} as const + +function isLogLevelName(value: string): value is keyof typeof LOG_LEVEL_BY_NAME { + return Object.hasOwn(LOG_LEVEL_BY_NAME, value) +} + +function fail(message: string): never { + console.error(message) + process.exit(1) +} + +function parsePositiveIntOption( + name: string, + value: string, + max = Number.MAX_SAFE_INTEGER, +): number { + const parsed = Number.parseInt(value, 10) + if (!Number.isInteger(parsed) || parsed <= 0 || parsed > max) { + fail(`Invalid ${name}: ${value}`) + } + return parsed +} + +function parsePositiveFloatOption( + name: string, + value: string, + min = Number.MIN_VALUE, + max = Number.MAX_SAFE_INTEGER, +): number { + const parsed = Number.parseFloat(value) + if (!Number.isFinite(parsed) || parsed < min || parsed > max) { + fail(`Invalid ${name}: ${value}`) + } + return parsed +} + +function parseLogLevelOption(value: string): number { + const normalized = value.trim().toLowerCase() + const parsedNumeric = Number.parseInt(normalized, 10) + + if ( + Number.isInteger(parsedNumeric) && + parsedNumeric >= SevLogger.LEVEL.EMERG && + parsedNumeric <= SevLogger.LEVEL.TRACE + ) { + return parsedNumeric + } + + if (isLogLevelName(normalized)) { + return LOG_LEVEL_BY_NAME[normalized] + } + + fail( + `Invalid log level: ${value}. Use 0-8 or one of ${Object.keys(LOG_LEVEL_BY_NAME).join(', ')}.`, + ) +} + +function parseHttpUrlOption(name: string, value: string): URL { + let parsed: URL + + try { + parsed = new URL(value) + } catch { + fail(`Invalid ${name}: ${value}`) + } + + if (!['http:', 'https:'].includes(parsed.protocol)) { + fail(`Invalid ${name} protocol: ${parsed.protocol}. Use http or https.`) + } + if (!parsed.hostname) { + fail(`Invalid ${name}: missing hostname.`) + } + + return parsed +} + +function parseNotifyUrlOption(value: string): string { + const parsed = parseHttpUrlOption('notifyUrl', value) + const normalizedHost = parsed.hostname.replace(/^\[(.*)\]$/, '$1').toLowerCase() + if (parsed.protocol === 'http:' && !LOCAL_HOSTS.has(normalizedHost)) { + fail('Insecure notifyUrl over HTTP is only allowed for localhost/127.0.0.1/::1.') + } + + return parsed.toString() +} + +function sparkline(values: number[], width = 44): string { + const chars = '▁▂▃▄▅▆▇█' + const tail = values.slice(-width) + + if (tail.length === 0) { + return '·'.repeat(width) + } + + const max = Math.max(...tail, 1) + return tail + .map((value) => { + const ratio = value / max + const index = Math.max(0, Math.min(chars.length - 1, Math.round(ratio * (chars.length - 1)))) + return chars[index] + }) + .join('') + .padStart(width, '·') +} + +function createTuiMode(logLevel: number | undefined): { + runtimeOptions: ProxyRuntimeOptions + start: () => void + stop: () => void +} { + const counters: Record = {} + const gauges: Record = {} + const timings: Record = {} + const logs: ProxyLogEvent[] = [] + const series: { + forward: number[] + pollRetry: number[] + notifyOk: number[] + inflight: number[] + latencyForward: number[] + latencyNotify: number[] + } = { + forward: [], + pollRetry: [], + notifyOk: [], + inflight: [], + latencyForward: [], + latencyNotify: [], + } + + const startedAt = Date.now() + let timer: NodeJS.Timeout | null = null + + const pushSeries = (key: keyof typeof series, value: number): void => { + const bucket = series[key] + bucket.push(value) + if (bucket.length > 160) { + bucket.splice(0, bucket.length - 160) + } + } + + const onCounter = (event: CounterMetricEvent): void => { + counters[event.name] = event.total + if (event.name === 'forward.requests_total') pushSeries('forward', event.total) + if (event.name === 'poll.retry_total') pushSeries('pollRetry', event.total) + if (event.name === 'notify.success_total') pushSeries('notifyOk', event.total) + } + + const onGauge = (event: GaugeMetricEvent): void => { + gauges[event.name] = event.value + if (event.name === 'poll.in_flight') pushSeries('inflight', event.value) + } + + const onTiming = (event: TimingMetricEvent): void => { + timings[event.name] = event + if (event.name === 'forward.request_duration_ms') pushSeries('latencyForward', event.durationMs) + if (event.name === 'notify.duration_ms') pushSeries('latencyNotify', event.durationMs) + } + + const onLog = (event: ProxyLogEvent): void => { + logs.push(event) + if (logs.length > 160) { + logs.splice(0, logs.length - 160) + } + } + + const runtimeOptions: ProxyRuntimeOptions = { + ...(typeof logLevel === 'number' ? { logLevel } : {}), + metricsHooks: { + onCounter, + onGauge, + onTiming, + }, + onLog, + } + + const render = (): void => { + const uptimeSec = Math.max(0, Math.floor((Date.now() - startedAt) / 1000)) + const h = Math.floor(uptimeSec / 3600) + const m = Math.floor((uptimeSec % 3600) / 60) + const s = uptimeSec % 60 + const uptime = `${String(h).padStart(2, '0')}:${String(m).padStart(2, '0')}:${String(s).padStart(2, '0')}` + + const lines: string[] = [] + lines.push('\x1b[2J\x1b[H') + lines.push('\x1b[38;5;117mNotify URL Relay // Reactive TUI\x1b[0m') + lines.push(`Uptime: ${uptime} | Press Ctrl+C to exit`) + lines.push('') + + lines.push('METRICS') + lines.push( + ` forward.requests_total: ${counters['forward.requests_total'] ?? 0} poll.retry_total: ${counters['poll.retry_total'] ?? 0}`, + ) + lines.push( + ` notify.success_total : ${counters['notify.success_total'] ?? 0} poll.in_flight : ${gauges['poll.in_flight'] ?? 0}`, + ) + lines.push( + ` forward p50-ish last: ${Math.round(timings['forward.request_duration_ms']?.avgMs ?? 0)}ms notify avg: ${Math.round(timings['notify.duration_ms']?.avgMs ?? 0)}ms`, + ) + lines.push('') + + lines.push('GRAPHS') + lines.push(` Forward Throughput ${sparkline(series.forward)}`) + lines.push(` Poll Retries ${sparkline(series.pollRetry)}`) + lines.push(` Notify Success ${sparkline(series.notifyOk)}`) + lines.push(` In-Flight Polls ${sparkline(series.inflight)}`) + lines.push(` Forward Latency ${sparkline(series.latencyForward)}`) + lines.push(` Notify Latency ${sparkline(series.latencyNotify)}`) + lines.push('') + + lines.push('LIVE LOGS') + for (const log of logs.slice(-18)) { + const time = new Date(log.at).toLocaleTimeString() + const level = log.level.padEnd(6, ' ') + lines.push(` ${time} ${level} ${log.message}`) + } + + process.stdout.write(lines.join('\n')) + } + + const start = (): void => { + if (process.stdout.isTTY) { + process.stdout.write('\x1b[?25l') + } + timer = setInterval(render, 180) + render() + } + + const stop = (): void => { + if (timer) { + clearInterval(timer) + timer = null + } + if (process.stdout.isTTY) { + process.stdout.write('\x1b[?25h\n') + } + } + + return { runtimeOptions, start, stop } +} + +type CliState = { + help: boolean + logLevelRaw: string | undefined + notifyUrlRaw: string | undefined + settings: Partial + ui: boolean +} + +type CliOptionDefinitionBase = { + aliases?: string[] + description: string + key: string + short?: string + usage: string +} + +type CliOptionDefinitionString = CliOptionDefinitionBase & { + apply: (value: string, state: CliState) => void + type: 'string' +} + +type CliOptionDefinitionBoolean = CliOptionDefinitionBase & { + apply: (value: boolean, state: CliState) => void + type: 'boolean' +} + +type CliOptionDefinition = CliOptionDefinitionString | CliOptionDefinitionBoolean + +const CLI_OPTIONS: CliOptionDefinition[] = [ + { + key: 'notifyUrl', + type: 'string', + usage: '--notifyUrl ', + description: 'URL to send notifications to (http://localhost allowed, otherwise https)', + apply: (value, state) => { + state.notifyUrlRaw = parseNotifyUrlOption(value) + }, + }, + { + key: 'target', + type: 'string', + usage: '--target ', + description: 'Transloadit endpoint base URL', + apply: (value, state) => { + state.settings.target = parseHttpUrlOption('target', value).toString() + }, + }, + { + key: 'port', + type: 'string', + usage: '--port ', + description: 'Local listen port', + apply: (value, state) => { + state.settings.port = parsePositiveIntOption('port', value, 65_535) + }, + }, + { + key: 'forwardTimeoutMs', + type: 'string', + usage: '--forwardTimeoutMs ', + description: 'Forward request timeout in milliseconds', + apply: (value, state) => { + state.settings.forwardTimeoutMs = parsePositiveIntOption('forwardTimeoutMs', value) + }, + }, + { + key: 'pollIntervalMs', + type: 'string', + usage: '--pollIntervalMs ', + description: 'Base poll retry interval in milliseconds', + apply: (value, state) => { + state.settings.pollIntervalMs = parsePositiveIntOption('pollIntervalMs', value) + }, + }, + { + key: 'pollMaxIntervalMs', + type: 'string', + usage: '--pollMaxIntervalMs ', + description: 'Max poll retry interval in milliseconds', + apply: (value, state) => { + state.settings.pollMaxIntervalMs = parsePositiveIntOption('pollMaxIntervalMs', value) + }, + }, + { + key: 'pollBackoffFactor', + type: 'string', + usage: '--pollBackoffFactor ', + description: 'Poll retry backoff factor (>= 1)', + apply: (value, state) => { + state.settings.pollBackoffFactor = parsePositiveFloatOption('pollBackoffFactor', value, 1) + }, + }, + { + key: 'pollRequestTimeoutMs', + type: 'string', + usage: '--pollRequestTimeoutMs ', + description: 'Per poll request timeout in milliseconds', + apply: (value, state) => { + state.settings.pollRequestTimeoutMs = parsePositiveIntOption('pollRequestTimeoutMs', value) + }, + }, + { + key: 'maxPollAttempts', + type: 'string', + usage: '--maxPollAttempts ', + description: 'Max number of poll attempts', + apply: (value, state) => { + state.settings.maxPollAttempts = parsePositiveIntOption('maxPollAttempts', value) + }, + }, + { + key: 'maxInFlightPolls', + type: 'string', + usage: '--maxInFlightPolls ', + description: 'Max number of active assembly pollers', + apply: (value, state) => { + state.settings.maxInFlightPolls = parsePositiveIntOption('maxInFlightPolls', value) + }, + }, + { + key: 'notifyOnTerminalError', + aliases: ['notify-on-terminal-error'], + type: 'boolean', + usage: '--notifyOnTerminalError', + description: 'Send notify payload when terminal error is reached', + apply: (value, state) => { + if (value) { + state.settings.notifyOnTerminalError = true + } + }, + }, + { + key: 'notifyTimeoutMs', + type: 'string', + usage: '--notifyTimeoutMs ', + description: 'Per notify request timeout in milliseconds', + apply: (value, state) => { + state.settings.notifyTimeoutMs = parsePositiveIntOption('notifyTimeoutMs', value) + }, + }, + { + key: 'notifyMaxAttempts', + type: 'string', + usage: '--notifyMaxAttempts ', + description: 'Max number of notify attempts', + apply: (value, state) => { + state.settings.notifyMaxAttempts = parsePositiveIntOption('notifyMaxAttempts', value) + }, + }, + { + key: 'notifyIntervalMs', + type: 'string', + usage: '--notifyIntervalMs ', + description: 'Base notify retry interval in milliseconds', + apply: (value, state) => { + state.settings.notifyIntervalMs = parsePositiveIntOption('notifyIntervalMs', value) + }, + }, + { + key: 'notifyMaxIntervalMs', + type: 'string', + usage: '--notifyMaxIntervalMs ', + description: 'Max notify retry interval in milliseconds', + apply: (value, state) => { + state.settings.notifyMaxIntervalMs = parsePositiveIntOption('notifyMaxIntervalMs', value) + }, + }, + { + key: 'notifyBackoffFactor', + type: 'string', + usage: '--notifyBackoffFactor ', + description: 'Notify retry backoff factor (>= 1)', + apply: (value, state) => { + state.settings.notifyBackoffFactor = parsePositiveFloatOption('notifyBackoffFactor', value, 1) + }, + }, + { + key: 'ui', + type: 'boolean', + usage: '--ui', + description: 'Enable reactive terminal dashboard (TUI)', + apply: (value, state) => { + state.ui = value + }, + }, + { + key: 'log-level', + aliases: ['logLevel'], + short: 'l', + type: 'string', + usage: '-l, --log-level ', + description: 'Log level (0-8 or emerg/alert/crit/err/warn/notice/info/debug/trace)', + apply: (value, state) => { + state.logLevelRaw = value + }, + }, + { + key: 'help', + short: 'h', + type: 'boolean', + usage: '-h, --help', + description: 'Show this help', + apply: (value, state) => { + state.help = value + }, + }, +] + +function getOptionNames(definition: CliOptionDefinition): string[] { + return [definition.key, ...(definition.aliases ?? [])] +} + +function buildParseOptions( + definitions: CliOptionDefinition[], +): Record { + const options: Record = {} + + for (const definition of definitions) { + for (const name of getOptionNames(definition)) { + options[name] = { + type: definition.type, + ...(name === definition.key && definition.short ? { short: definition.short } : {}), + } + } + } + + return options +} + +function readStringOption( + values: Record, + definition: CliOptionDefinitionString, +): string | undefined { + for (const name of getOptionNames(definition)) { + const value = values[name] + if (typeof value === 'string') { + return value + } + } + + return undefined +} + +function readBooleanOption( + values: Record, + definition: CliOptionDefinitionBoolean, +): boolean { + for (const name of getOptionNames(definition)) { + if (values[name] === true) { + return true + } + } + + return false +} + +function parseCliState(values: Record): CliState { + const state: CliState = { + help: false, + logLevelRaw: undefined, + notifyUrlRaw: undefined, + settings: {}, + ui: false, + } + + for (const definition of CLI_OPTIONS) { + if (definition.type === 'string') { + const value = readStringOption(values, definition) + if (value !== undefined) { + definition.apply(value, state) + } + continue + } + + const value = readBooleanOption(values, definition) + if (value) { + definition.apply(value, state) + } + } + + return state +} + +function normalizeCliValues( + values: Record, +): Record { + const normalized: Record = {} + + for (const [key, value] of Object.entries(values)) { + if (typeof value === 'string' || typeof value === 'boolean' || value === undefined) { + normalized[key] = value + } + } + + return normalized +} + +function printHelp(): void { + const usageWidth = Math.max(...CLI_OPTIONS.map((option) => option.usage.length)) + const optionLines = CLI_OPTIONS.map( + (option) => ` ${option.usage.padEnd(usageWidth + 2)}${option.description}`, + ) + + console.log(`Usage: notify-url-relay [options] + +Options: +${optionLines.join('\n')} + +Environment fallback: + TRANSLOADIT_SECRET, TRANSLOADIT_NOTIFY_URL, TRANSLOADIT_LOG_LEVEL +`) +} + +const { values } = parseArgs({ + options: buildParseOptions(CLI_OPTIONS), +}) + +const cliState = parseCliState(normalizeCliValues(values)) + +if (cliState.help) { + printHelp() + process.exit(0) +} + +const secret = process.env.TRANSLOADIT_SECRET +if (!secret) { + fail('Missing secret. Set TRANSLOADIT_SECRET.') +} + +const settings = cliState.settings + +const rawLogLevel = cliState.logLevelRaw ?? process.env.TRANSLOADIT_LOG_LEVEL +const logLevel = rawLogLevel ? parseLogLevelOption(rawLogLevel) : undefined + +const tui = cliState.ui ? createTuiMode(logLevel) : null +const runtimeOptions: ProxyRuntimeOptions = tui + ? tui.runtimeOptions + : { + ...(typeof logLevel === 'number' ? { logLevel } : {}), + } + +const notifyUrlRaw = cliState.notifyUrlRaw ?? process.env.TRANSLOADIT_NOTIFY_URL +const notifyUrl = notifyUrlRaw ? parseNotifyUrlOption(notifyUrlRaw) : undefined + +const proxy = new TransloaditNotifyUrlProxy(secret, notifyUrl, runtimeOptions) +proxy.run(settings) + +if (tui) { + tui.start() +} + +const close = () => { + tui?.stop() + proxy.close() + process.exit(0) +} + +process.on('SIGINT', close) +process.on('SIGTERM', close) diff --git a/packages/notify-url-relay/src/index.ts b/packages/notify-url-relay/src/index.ts new file mode 100644 index 00000000..490c7331 --- /dev/null +++ b/packages/notify-url-relay/src/index.ts @@ -0,0 +1,900 @@ +import { once } from 'node:events' +import type { IncomingMessage, Server, ServerResponse } from 'node:http' +import { createServer } from 'node:http' +import { Readable } from 'node:stream' +import { pipeline } from 'node:stream/promises' +import type { ReadableStream as NodeReadableStream } from 'node:stream/web' + +import { SevLogger } from '@transloadit/sev-logger' +import { signParamsSync } from '@transloadit/utils/node' +import type { AssemblyStatus, assemblyStatusOkCodeSchema } from '@transloadit/zod/v4' +import { + assemblyStatusSchema, + getAssemblyStage, + getError, + getOk, + isAssemblyBusy, + isAssemblyOkStatus, + isAssemblyTerminalError, + isAssemblyTerminalOk, + parseAssemblyUrls, +} from '@transloadit/zod/v4' +import type { RetryContext } from 'p-retry' +import pRetry, { AbortError } from 'p-retry' + +const HOP_BY_HOP_HEADERS = new Set([ + 'connection', + 'keep-alive', + 'proxy-authenticate', + 'proxy-authorization', + 'te', + 'trailer', + 'transfer-encoding', + 'upgrade', +]) + +const DECODED_BODY_HEADERS = new Set(['content-encoding', 'content-length']) + +const MAX_CAPTURED_RESPONSE_BYTES = 512 * 1024 + +export type ProxyErrorCode = + | 'FORWARD_TIMEOUT' + | 'FORWARD_UPSTREAM_ERROR' + | 'POLL_TIMEOUT' + | 'NOTIFY_TIMEOUT' + +export interface ProxySettings { + target: string + port: number + forwardTimeoutMs: number + pollIntervalMs: number + pollMaxIntervalMs: number + pollBackoffFactor: number + pollRequestTimeoutMs: number + maxPollAttempts: number + maxInFlightPolls: number + notifyOnTerminalError: boolean + notifyTimeoutMs: number + notifyMaxAttempts: number + notifyIntervalMs: number + notifyMaxIntervalMs: number + notifyBackoffFactor: number +} + +export interface CounterMetricEvent { + kind: 'counter' + name: string + at: string + delta: number + total: number + tags?: Record +} + +export interface GaugeMetricEvent { + kind: 'gauge' + name: string + at: string + value: number +} + +export interface TimingMetricEvent { + kind: 'timing' + name: string + at: string + durationMs: number + count: number + minMs: number + maxMs: number + avgMs: number + tags?: Record +} + +export interface ProxyMetricsHooks { + onCounter?: (event: CounterMetricEvent) => void + onGauge?: (event: GaugeMetricEvent) => void + onTiming?: (event: TimingMetricEvent) => void +} + +export interface ProxyLogEvent { + at: string + level: 'debug' | 'info' | 'notice' | 'warn' | 'err' + message: string +} + +export interface ProxyRuntimeOptions { + logger?: SevLogger + logLevel?: number + metricsHooks?: ProxyMetricsHooks + onLog?: (event: ProxyLogEvent) => void +} + +type KnownAssemblyState = (typeof assemblyStatusOkCodeSchema.options)[number] + +export type AssemblyResponse = AssemblyStatus + +interface TimingAggregate { + count: number + totalMs: number + minMs: number + maxMs: number + lastMs: number +} + +const DEFAULT_SETTINGS: ProxySettings = { + target: 'https://api2.transloadit.com', + port: 8888, + forwardTimeoutMs: 15_000, + pollIntervalMs: 2_000, + pollMaxIntervalMs: 30_000, + pollBackoffFactor: 2, + pollRequestTimeoutMs: 15_000, + maxPollAttempts: 10, + maxInFlightPolls: 4, + notifyOnTerminalError: false, + notifyTimeoutMs: 15_000, + notifyMaxAttempts: 3, + notifyIntervalMs: 500, + notifyMaxIntervalMs: 5_000, + notifyBackoffFactor: 2, +} + +const DEFAULT_LOG_LEVEL = SevLogger.LEVEL.INFO + +class ProxyTimeoutError extends Error { + readonly code: ProxyErrorCode + + constructor(code: ProxyErrorCode, message: string) { + super(message) + this.code = code + } +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null +} + +function toErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message + } + + return String(error) +} + +function supportsBody(method: string | undefined): boolean { + const normalized = (method ?? 'GET').toUpperCase() + return normalized !== 'GET' && normalized !== 'HEAD' +} + +function isAbortLikeError(error: unknown): boolean { + if (error instanceof DOMException && error.name === 'AbortError') { + return true + } + + return error instanceof Error && error.name === 'AbortError' +} + +function getErrorCode(error: unknown, fallback: ProxyErrorCode): ProxyErrorCode { + if (error instanceof ProxyTimeoutError) { + return error.code + } + + return fallback +} + +function getHeaderValues(name: string, headers: Headers): string[] { + const normalized = name.toLowerCase() + if (normalized !== 'set-cookie') { + return [] + } + + const maybeGetSetCookie = Reflect.get(headers, 'getSetCookie') + if (typeof maybeGetSetCookie === 'function') { + const values = maybeGetSetCookie.call(headers) + if (Array.isArray(values)) { + return values.filter((value): value is string => typeof value === 'string') + } + } + + const fallback = headers.get('set-cookie') + return fallback ? [fallback] : [] +} + +function isJsonResponse(contentType: string | null): boolean { + if (!contentType) { + return false + } + + return /application\/json|\+json/i.test(contentType) +} + +function createTimeoutSignal( + parentSignal: AbortSignal | null | undefined, + timeoutMs: number, +): { signal: AbortSignal; timeoutSignal: AbortSignal } { + const timeoutSignal = AbortSignal.timeout(timeoutMs) + const signal = parentSignal ? AbortSignal.any([parentSignal, timeoutSignal]) : timeoutSignal + return { signal, timeoutSignal } +} + +function getListeningPort(server: Server): number { + const address = server.address() + if (address === null || typeof address === 'string') { + throw new Error('Could not resolve server address') + } + + return address.port +} + +export function extractAssemblyUrl(body: string): string | null { + try { + const payload: unknown = JSON.parse(body) + return parseAssemblyUrls(payload).assemblyUrl + } catch { + return null + } +} + +export function getAssemblyState(payload: unknown): KnownAssemblyState { + if (!isRecord(payload)) { + throw new Error('No ok field found in Assembly response.') + } + + const ok = typeof payload.ok === 'string' ? payload.ok : undefined + if (!isAssemblyOkStatus(ok)) { + throw new Error(`Unknown Assembly state found: ${String(payload.ok)}`) + } + + return ok +} + +export function getSignature(secret: string, toSign: string): string { + return signParamsSync(toSign, secret, 'sha384') +} + +export function parseAssemblyResponse(payload: unknown): AssemblyResponse { + const parsed = assemblyStatusSchema.safeParse(payload) + if (!parsed.success) { + throw new Error('Invalid assembly response payload.') + } + + return parsed.data +} + +export class TransloaditNotifyUrlProxy { + private server: Server | null = null + private isClosing = false + + private readonly secret: string + private readonly notifyUrl: string + private readonly logger: SevLogger + private readonly metricsHooks: ProxyMetricsHooks | undefined + private readonly onLog: ((event: ProxyLogEvent) => void) | undefined + private readonly defaults: ProxySettings + private settings: ProxySettings + + private readonly pendingAssemblyUrls = new Set() + private readonly activePolls = new Map>() + private readonly pollControllers = new Map() + private activePollCount = 0 + + private readonly counters = new Map() + private readonly gauges = new Map() + private readonly timings = new Map() + + constructor( + secret: string, + notifyUrl = 'http://127.0.0.1:3000/transloadit', + runtimeOptions: ProxyRuntimeOptions = {}, + ) { + this.secret = secret || '' + this.notifyUrl = notifyUrl + this.metricsHooks = runtimeOptions.metricsHooks + this.onLog = runtimeOptions.onLog + + this.defaults = { ...DEFAULT_SETTINGS } + this.settings = { ...DEFAULT_SETTINGS } + this.logger = + runtimeOptions.logger ?? + new SevLogger({ + breadcrumbs: ['notify-url-relay'], + level: runtimeOptions.logLevel ?? DEFAULT_LOG_LEVEL, + }) + + if (runtimeOptions.logger && typeof runtimeOptions.logLevel === 'number') { + this.logger.update({ level: runtimeOptions.logLevel }) + } + } + + run(opts: Partial = {}): void { + if (this.server !== null) { + this.close() + } + + this.isClosing = false + this.settings = { ...this.defaults, ...opts } + + this.setGauge('poll.in_flight', 0) + this.setGauge('poll.pending', 0) + + this.server = createServer((request, response) => { + void this.handleForward(request, response) + }) + + const listeningServer = this.server + listeningServer.listen(this.settings.port, () => { + this.log( + 'notice', + `Listening on http://localhost:${getListeningPort(listeningServer)}, forwarding to ${this.settings.target}, notifying ${this.notifyUrl}`, + ) + }) + } + + async waitForListenPort(): Promise { + if (this.server === null) { + throw new Error('Proxy server is not running.') + } + + if (!this.server.listening) { + await once(this.server, 'listening') + } + + return getListeningPort(this.server) + } + + close(): void { + this.isClosing = true + + this.server?.close() + this.server = null + + for (const [assemblyUrl, controller] of this.pollControllers) { + controller.abort(new Error(`Proxy closed while polling ${assemblyUrl}`)) + } + + this.pollControllers.clear() + this.pendingAssemblyUrls.clear() + this.activePolls.clear() + this.activePollCount = 0 + + this.setGauge('poll.in_flight', 0) + this.setGauge('poll.pending', 0) + } + + private log(level: ProxyLogEvent['level'], message: string): void { + if (level === 'debug') { + this.logger.debug(message) + } else if (level === 'info') { + this.logger.info(message) + } else if (level === 'notice') { + this.logger.notice(message) + } else if (level === 'warn') { + this.logger.warn(message) + } else { + this.logger.err(message) + } + + this.onLog?.({ + at: new Date().toISOString(), + level, + message, + }) + } + + private incrementCounter(name: string, delta = 1, tags?: Record): void { + const total = (this.counters.get(name) ?? 0) + delta + this.counters.set(name, total) + + this.metricsHooks?.onCounter?.({ + kind: 'counter', + name, + at: new Date().toISOString(), + delta, + total, + ...(tags ? { tags } : {}), + }) + } + + private setGauge(name: string, value: number): void { + this.gauges.set(name, value) + + this.metricsHooks?.onGauge?.({ + kind: 'gauge', + name, + at: new Date().toISOString(), + value, + }) + } + + private observeTiming(name: string, durationMs: number, tags?: Record): void { + let stats = this.timings.get(name) + if (!stats) { + stats = { + count: 1, + totalMs: durationMs, + minMs: durationMs, + maxMs: durationMs, + lastMs: durationMs, + } + this.timings.set(name, stats) + } else { + stats.count += 1 + stats.totalMs += durationMs + stats.minMs = Math.min(stats.minMs, durationMs) + stats.maxMs = Math.max(stats.maxMs, durationMs) + stats.lastMs = durationMs + } + + this.metricsHooks?.onTiming?.({ + kind: 'timing', + name, + at: new Date().toISOString(), + durationMs, + count: stats.count, + minMs: stats.minMs, + maxMs: stats.maxMs, + avgMs: stats.totalMs / stats.count, + ...(tags ? { tags } : {}), + }) + } + + private async handleForward(request: IncomingMessage, response: ServerResponse): Promise { + const requestStartedAt = Date.now() + this.incrementCounter('forward.requests_total') + + const proxyController = new AbortController() + request.on('aborted', () => { + proxyController.abort(new Error('Client aborted request')) + }) + + try { + const targetUrl = this.resolveTargetUrl(request.url) + const requestBody = supportsBody(request.method) + ? (Readable.toWeb(request) as unknown as ReadableStream) + : undefined + + const fetchInit: RequestInit & { duplex?: 'half' } = { + method: request.method ?? 'GET', + headers: this.createForwardHeaders(request), + redirect: 'manual', + signal: proxyController.signal, + } + + if (requestBody) { + fetchInit.body = requestBody + fetchInit.duplex = 'half' + } + + const upstreamResponse = await this.fetchWithTimeout( + targetUrl, + fetchInit, + this.settings.forwardTimeoutMs, + 'FORWARD_TIMEOUT', + ) + + await this.pipeForwardResponse(response, upstreamResponse) + + this.incrementCounter('forward.requests_ok') + this.observeTiming('forward.request_duration_ms', Date.now() - requestStartedAt) + } catch (error) { + if (isAbortLikeError(error)) { + return + } + + const code = getErrorCode(error, 'FORWARD_UPSTREAM_ERROR') + const statusCode = code === 'FORWARD_TIMEOUT' ? 504 : 502 + + this.incrementCounter('forward.requests_error', 1, { code }) + this.observeTiming('forward.request_duration_ms', Date.now() - requestStartedAt, { code }) + this.writeErrorResponse(response, statusCode, code, toErrorMessage(error)) + this.log('err', `Forward request failed with ${code}: ${toErrorMessage(error)}`) + } + } + + private resolveTargetUrl(requestUrl: string | undefined): string { + const path = requestUrl ?? '/' + if (/^https?:\/\//i.test(path)) { + throw new Error(`Absolute request URL is not supported: ${path}`) + } + if (path.startsWith('//')) { + throw new Error(`Protocol-relative request URL is not supported: ${path}`) + } + if (/^(?:\[[^\]]+\]|[a-z0-9.-]+):\d+$/i.test(path)) { + throw new Error(`Authority-form request URL is not supported: ${path}`) + } + + return new URL(path, this.settings.target).toString() + } + + private createForwardHeaders(request: IncomingMessage): Headers { + const headers = new Headers() + + for (const [name, value] of Object.entries(request.headers)) { + const headerName = name.toLowerCase() + if ( + HOP_BY_HOP_HEADERS.has(headerName) || + headerName === 'host' || + headerName === 'accept-encoding' + ) { + continue + } + if (value === undefined) { + continue + } + + if (Array.isArray(value)) { + for (const item of value) { + headers.append(name, item) + } + } else { + headers.set(name, value) + } + } + + if (request.socket.remoteAddress) { + headers.set('x-forwarded-for', request.socket.remoteAddress) + } + if (typeof request.headers.host === 'string') { + headers.set('x-forwarded-host', request.headers.host) + } + headers.set('accept-encoding', 'identity') + + return headers + } + + private async pipeForwardResponse( + response: ServerResponse, + upstreamResponse: Response, + ): Promise { + response.statusCode = upstreamResponse.status + response.statusMessage = upstreamResponse.statusText + + for (const [name, value] of upstreamResponse.headers) { + const headerName = name.toLowerCase() + if ( + HOP_BY_HOP_HEADERS.has(headerName) || + headerName === 'set-cookie' || + DECODED_BODY_HEADERS.has(headerName) + ) { + continue + } + response.setHeader(name, value) + } + + const setCookies = getHeaderValues('set-cookie', upstreamResponse.headers) + if (setCookies.length > 0) { + response.setHeader('set-cookie', setCookies) + } + + if (!upstreamResponse.body) { + response.end() + return + } + + const shouldCapture = isJsonResponse(upstreamResponse.headers.get('content-type')) + + const upstreamBodyNode = Readable.fromWeb( + upstreamResponse.body as unknown as NodeReadableStream, + ) + const capturedChunks: Buffer[] = [] + let capturedBytes = 0 + + if (shouldCapture) { + upstreamBodyNode.on('data', (chunk: Buffer | string | Uint8Array) => { + if (capturedBytes >= MAX_CAPTURED_RESPONSE_BYTES) { + return + } + + const chunkBuffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk) + const remaining = MAX_CAPTURED_RESPONSE_BYTES - capturedBytes + const toCapture = + chunkBuffer.length <= remaining ? chunkBuffer : chunkBuffer.subarray(0, remaining) + + capturedChunks.push(Buffer.from(toCapture)) + capturedBytes += toCapture.length + }) + } + + await pipeline(upstreamBodyNode, response) + + if (shouldCapture && capturedBytes > 0) { + const body = Buffer.concat(capturedChunks, capturedBytes) + this.maybePollAssemblyFromBody(body) + } + } + + private writeErrorResponse( + response: ServerResponse, + statusCode: number, + code: ProxyErrorCode, + message: string, + ): void { + if (response.headersSent) { + if (!response.writableEnded) { + response.end() + } + return + } + + response.writeHead(statusCode, { + 'content-type': 'application/json; charset=utf-8', + 'x-notify-proxy-error-code': code, + }) + + response.end( + JSON.stringify({ + error: code, + message, + }), + ) + } + + private maybePollAssemblyFromBody(body: Buffer): void { + const assemblyUrl = extractAssemblyUrl(body.toString('utf-8')) + if (!assemblyUrl) { + return + } + + this.enqueueAssemblyPoll(assemblyUrl) + } + + private enqueueAssemblyPoll(assemblyUrl: string): void { + if (this.isClosing) { + return + } + + if (this.pendingAssemblyUrls.has(assemblyUrl) || this.activePolls.has(assemblyUrl)) { + this.incrementCounter('poll.dedupe_skipped_total') + this.log('debug', `Skipping duplicate poll registration for ${assemblyUrl}`) + return + } + + this.pendingAssemblyUrls.add(assemblyUrl) + this.setGauge('poll.pending', this.pendingAssemblyUrls.size) + this.incrementCounter('poll.enqueued_total') + this.log('info', `Queued poll for ${assemblyUrl}`) + + this.drainPollQueue() + } + + private drainPollQueue(): void { + if (this.isClosing) { + return + } + + while (this.activePollCount < this.settings.maxInFlightPolls) { + const nextEntry = this.pendingAssemblyUrls.values().next() + if (nextEntry.done) { + break + } + const next = nextEntry.value + + this.pendingAssemblyUrls.delete(next) + this.setGauge('poll.pending', this.pendingAssemblyUrls.size) + + const controller = new AbortController() + this.pollControllers.set(next, controller) + this.activePollCount += 1 + this.setGauge('poll.in_flight', this.activePollCount) + + const pollPromise = this.pollAssembly(next, controller.signal).finally(() => { + if (this.activePolls.get(next) !== pollPromise) { + return + } + + this.activePolls.delete(next) + this.pollControllers.delete(next) + this.activePollCount = Math.max(0, this.activePollCount - 1) + this.setGauge('poll.in_flight', this.activePollCount) + + if (!this.isClosing) { + this.drainPollQueue() + } + }) + + this.activePolls.set(next, pollPromise) + } + } + + private async pollAssembly(assemblyUrl: string, signal: AbortSignal): Promise { + const retries = Math.max(this.settings.maxPollAttempts - 1, 0) + const pollStartedAt = Date.now() + + this.incrementCounter('poll.started_total') + + try { + const response = await pRetry(() => this.checkAssembly(assemblyUrl, signal), { + retries, + minTimeout: this.settings.pollIntervalMs, + maxTimeout: this.settings.pollMaxIntervalMs, + factor: this.settings.pollBackoffFactor, + randomize: true, + signal, + onFailedAttempt: (retryContext: RetryContext) => { + if (retryContext.retriesLeft <= 0) { + return + } + + this.incrementCounter('poll.retry_total') + this.log( + 'warn', + `Attempt ${retryContext.attemptNumber}/${this.settings.maxPollAttempts} failed for ${assemblyUrl}: ${retryContext.error.message}`, + ) + }, + }) + + await this.notifyWithRetry(response, signal) + + this.incrementCounter('poll.completed_total') + this.observeTiming('poll.duration_ms', Date.now() - pollStartedAt) + } catch (error) { + if (error instanceof AbortError) { + this.incrementCounter('poll.aborted_total') + this.log('notice', error.message) + return + } + + if (signal.aborted || this.isClosing || isAbortLikeError(error)) { + this.incrementCounter('poll.cancelled_total') + this.log('debug', `Polling cancelled for ${assemblyUrl}`) + return + } + + const code = getErrorCode(error, 'POLL_TIMEOUT') + this.incrementCounter('poll.failed_total', 1, { code }) + this.observeTiming('poll.duration_ms', Date.now() - pollStartedAt, { code }) + this.log('err', `No attempts left for ${assemblyUrl}: ${toErrorMessage(error)}`) + } + } + + private async checkAssembly(assemblyUrl: string, signal: AbortSignal): Promise { + this.incrementCounter('poll.fetch_attempt_total') + + const response = await this.fetchWithTimeout( + assemblyUrl, + { signal }, + this.settings.pollRequestTimeoutMs, + 'POLL_TIMEOUT', + ) + + if (!response.ok) { + throw new Error(`Assembly poll returned HTTP ${response.status}`) + } + + const assembly = parseAssemblyResponse(await response.json()) + + if (isAssemblyTerminalError(assembly)) { + const errorCode = getError(assembly) ?? 'UNKNOWN_ERROR' + this.incrementCounter('poll.terminal_error_total', 1, { errorCode }) + + if (this.settings.notifyOnTerminalError) { + this.log( + 'notice', + `${assemblyUrl} reached terminal error state ${errorCode}; notifying because notifyOnTerminalError=true.`, + ) + return assembly + } + + throw new AbortError(`${assemblyUrl} reached terminal error state ${errorCode}.`) + } + + if (isAssemblyTerminalOk(assembly)) { + this.incrementCounter('poll.terminal_ok_total', 1, { state: getOk(assembly) ?? 'UNKNOWN' }) + this.log('info', `${assemblyUrl} reached terminal state ${getOk(assembly)}.`) + return assembly + } + + if (isAssemblyBusy(assembly)) { + const stage = getAssemblyStage(assembly) + if (stage === 'uploading') { + throw new Error(`${assemblyUrl} is still uploading.`) + } + if (stage === 'processing') { + throw new Error(`${assemblyUrl} is still executing.`) + } + throw new Error(`${assemblyUrl} is still replaying.`) + } + + throw new Error(`${assemblyUrl} returned a non-terminal assembly state.`) + } + + private async notifyWithRetry(response: AssemblyResponse, signal: AbortSignal): Promise { + const retries = Math.max(this.settings.notifyMaxAttempts - 1, 0) + + await pRetry(() => this.notifyOnce(response, signal), { + retries, + minTimeout: this.settings.notifyIntervalMs, + maxTimeout: this.settings.notifyMaxIntervalMs, + factor: this.settings.notifyBackoffFactor, + randomize: true, + signal, + onFailedAttempt: (retryContext: RetryContext) => { + if (signal.aborted || isAbortLikeError(retryContext.error)) { + return + } + + if (retryContext.retriesLeft <= 0) { + return + } + + this.incrementCounter('notify.retry_total') + this.log( + 'warn', + `Notify retry ${retryContext.attemptNumber}/${this.settings.notifyMaxAttempts} failed: ${retryContext.error.message}`, + ) + }, + shouldRetry: (retryContext: RetryContext) => { + if (signal.aborted || isAbortLikeError(retryContext.error)) { + return false + } + + return true + }, + }) + } + + private async notifyOnce(response: AssemblyResponse, signal: AbortSignal): Promise { + const notifyStartedAt = Date.now() + this.incrementCounter('notify.attempt_total') + + const transloadit = JSON.stringify(response) + const signature = getSignature(this.secret, transloadit) + + const notifyResponse = await this.fetchWithTimeout( + this.notifyUrl, + { + method: 'POST', + headers: { + 'content-type': 'application/x-www-form-urlencoded; charset=utf-8', + }, + body: new URLSearchParams({ + transloadit, + signature, + }), + signal, + }, + this.settings.notifyTimeoutMs, + 'NOTIFY_TIMEOUT', + ) + + if (!notifyResponse.ok) { + this.incrementCounter('notify.failed_total', 1, { code: `HTTP_${notifyResponse.status}` }) + this.observeTiming('notify.duration_ms', Date.now() - notifyStartedAt, { + code: `HTTP_${notifyResponse.status}`, + }) + throw new Error(`Notify URL returned HTTP ${notifyResponse.status}`) + } + + this.incrementCounter('notify.success_total') + this.observeTiming('notify.duration_ms', Date.now() - notifyStartedAt) + this.log('notice', `Notify payload sent to ${this.notifyUrl}`) + } + + private async fetchWithTimeout( + url: string, + init: RequestInit, + timeoutMs: number, + timeoutCode: ProxyErrorCode, + ): Promise { + const timeoutSignal = createTimeoutSignal(init.signal, timeoutMs) + + const fetchInit: RequestInit = { + ...init, + signal: timeoutSignal.signal, + } + + try { + return await fetch(url, fetchInit) + } catch (error) { + if (timeoutSignal.timeoutSignal.aborted) { + throw new ProxyTimeoutError(timeoutCode, `${timeoutCode} after ${timeoutMs}ms`) + } + + if (timeoutSignal.signal.reason instanceof ProxyTimeoutError) { + throw timeoutSignal.signal.reason + } + + throw error + } + } +} diff --git a/packages/notify-url-relay/test/behavior.test.ts b/packages/notify-url-relay/test/behavior.test.ts new file mode 100644 index 00000000..ac1fff48 --- /dev/null +++ b/packages/notify-url-relay/test/behavior.test.ts @@ -0,0 +1,330 @@ +import { createServer } from 'node:http' +import { setTimeout as delay } from 'node:timers/promises' + +import { describe, expect, it } from 'vitest' + +import { getSignature, TransloaditNotifyUrlProxy } from '../src/index.ts' +import { + closeServer, + json, + listen, + parseJsonRecord, + readBody, + startRelay, + waitFor, +} from './helpers.ts' + +describe('proxy behavior guards', () => { + it('dedupes duplicate assembly URLs and avoids duplicate poll loops', async () => { + const secret = 'foo_secret' + let upstreamPort = 0 + let pollCount = 0 + let notifyCount = 0 + let resolveNotify: (() => void) | null = null + const notifyReceived = new Promise((resolve) => { + resolveNotify = resolve + }) + + const notifyServer = createServer(async (request, response) => { + if (request.method !== 'POST' || request.url !== '/transloadit') { + response.writeHead(404) + response.end() + return + } + + notifyCount += 1 + await readBody(request) + response.writeHead(200) + response.end('ok') + resolveNotify?.() + }) + + const upstreamServer = createServer((request, response) => { + if (request.method === 'POST' && request.url === '/assemblies') { + json(response, 200, { + assembly_url: `http://127.0.0.1:${upstreamPort}/assembly/dupe`, + }) + return + } + + if (request.method === 'GET' && request.url === '/assembly/dupe') { + pollCount += 1 + if (pollCount === 1) { + json(response, 200, { ok: 'ASSEMBLY_EXECUTING' }) + return + } + + json(response, 200, { ok: 'ASSEMBLY_COMPLETED', assembly_id: 'dupe' }) + return + } + + response.writeHead(404) + response.end() + }) + + const notifyPort = await listen(notifyServer) + upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy( + secret, + `http://127.0.0.1:${notifyPort}/transloadit`, + { logLevel: 0 }, + ) + const proxyPort = await startRelay(proxy, { + target: `http://127.0.0.1:${upstreamPort}`, + pollIntervalMs: 10, + pollMaxIntervalMs: 100, + maxPollAttempts: 5, + maxInFlightPolls: 1, + }) + + try { + const [responseA, responseB] = await Promise.all([ + fetch(`http://127.0.0.1:${proxyPort}/assemblies`, { + method: 'POST', + body: new URLSearchParams({ params: '{}' }), + }), + fetch(`http://127.0.0.1:${proxyPort}/assemblies`, { + method: 'POST', + body: new URLSearchParams({ params: '{}' }), + }), + ]) + + expect(responseA.status).toBe(200) + expect(responseB.status).toBe(200) + + await Promise.race([ + notifyReceived, + delay(3_000).then(() => { + throw new Error('Timed out waiting for notify request') + }), + ]) + + expect(pollCount).toBe(2) + expect(notifyCount).toBe(1) + } finally { + proxy.close() + await closeServer(notifyServer) + await closeServer(upstreamServer) + } + }, 10_000) + + it('cancels polling on close()', async () => { + const secret = 'foo_secret' + let upstreamPort = 0 + let pollCount = 0 + + const notifyServer = createServer(async (request, response) => { + await readBody(request) + response.writeHead(200) + response.end('ok') + }) + + const upstreamServer = createServer((request, response) => { + if (request.method === 'POST' && request.url === '/assemblies') { + json(response, 200, { + assembly_url: `http://127.0.0.1:${upstreamPort}/assembly/slow`, + }) + return + } + + if (request.method === 'GET' && request.url === '/assembly/slow') { + pollCount += 1 + json(response, 200, { ok: 'ASSEMBLY_EXECUTING' }) + return + } + + response.writeHead(404) + response.end() + }) + + const notifyPort = await listen(notifyServer) + upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy( + secret, + `http://127.0.0.1:${notifyPort}/transloadit`, + { logLevel: 0 }, + ) + const proxyPort = await startRelay(proxy, { + target: `http://127.0.0.1:${upstreamPort}`, + pollIntervalMs: 20, + pollMaxIntervalMs: 40, + maxPollAttempts: 100, + maxInFlightPolls: 1, + }) + + try { + const createResponse = await fetch(`http://127.0.0.1:${proxyPort}/assemblies`, { + method: 'POST', + body: new URLSearchParams({ params: '{}' }), + }) + expect(createResponse.status).toBe(200) + + await waitFor(() => pollCount >= 1, 2_000, 5, 'Timed out waiting for first polling attempt') + + proxy.close() + const countAfterClose = pollCount + await delay(150) + + expect(pollCount).toBe(countAfterClose) + } finally { + proxy.close() + await closeServer(notifyServer) + await closeServer(upstreamServer) + } + }, 10_000) + + it('does not notify on terminal error by default', async () => { + const secret = 'foo_secret' + let upstreamPort = 0 + let notifyCount = 0 + + const notifyServer = createServer(async (request, response) => { + notifyCount += 1 + await readBody(request) + response.writeHead(200) + response.end('ok') + }) + + const upstreamServer = createServer((request, response) => { + if (request.method === 'POST' && request.url === '/assemblies') { + json(response, 200, { + assembly_url: `http://127.0.0.1:${upstreamPort}/assembly/error`, + }) + return + } + + if (request.method === 'GET' && request.url === '/assembly/error') { + json(response, 200, { + error: 'ASSEMBLY_CRASHED', + }) + return + } + + response.writeHead(404) + response.end() + }) + + const notifyPort = await listen(notifyServer) + upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy( + secret, + `http://127.0.0.1:${notifyPort}/transloadit`, + { logLevel: 0 }, + ) + const proxyPort = await startRelay(proxy, { + target: `http://127.0.0.1:${upstreamPort}`, + pollIntervalMs: 10, + pollMaxIntervalMs: 100, + maxPollAttempts: 5, + }) + + try { + const createResponse = await fetch(`http://127.0.0.1:${proxyPort}/assemblies`, { + method: 'POST', + body: new URLSearchParams({ params: '{}' }), + }) + expect(createResponse.status).toBe(200) + + await delay(200) + expect(notifyCount).toBe(0) + } finally { + proxy.close() + await closeServer(notifyServer) + await closeServer(upstreamServer) + } + }, 10_000) + + it('notifies when notifyOnTerminalError is enabled', async () => { + const secret = 'foo_secret' + let upstreamPort = 0 + let notifyTransloadit: string | null = null + let notifySignature: string | null = null + let resolveNotify: (() => void) | null = null + const notifyReceived = new Promise((resolve) => { + resolveNotify = resolve + }) + + const notifyServer = createServer(async (request, response) => { + if (request.method !== 'POST' || request.url !== '/transloadit') { + response.writeHead(404) + response.end() + return + } + + const payload = new URLSearchParams(await readBody(request)) + notifyTransloadit = payload.get('transloadit') + notifySignature = payload.get('signature') + response.writeHead(200) + response.end('ok') + resolveNotify?.() + }) + + const upstreamServer = createServer((request, response) => { + if (request.method === 'POST' && request.url === '/assemblies') { + json(response, 200, { + assembly_url: `http://127.0.0.1:${upstreamPort}/assembly/error`, + }) + return + } + + if (request.method === 'GET' && request.url === '/assembly/error') { + json(response, 200, { + error: 'ASSEMBLY_CRASHED', + }) + return + } + + response.writeHead(404) + response.end() + }) + + const notifyPort = await listen(notifyServer) + upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy( + secret, + `http://127.0.0.1:${notifyPort}/transloadit`, + { logLevel: 0 }, + ) + const proxyPort = await startRelay(proxy, { + target: `http://127.0.0.1:${upstreamPort}`, + pollIntervalMs: 10, + pollMaxIntervalMs: 100, + maxPollAttempts: 5, + notifyOnTerminalError: true, + }) + + try { + const createResponse = await fetch(`http://127.0.0.1:${proxyPort}/assemblies`, { + method: 'POST', + body: new URLSearchParams({ params: '{}' }), + }) + expect(createResponse.status).toBe(200) + + await Promise.race([ + notifyReceived, + delay(3_000).then(() => { + throw new Error('Timed out waiting for terminal-error notify request') + }), + ]) + + expect(notifyTransloadit).toBeTypeOf('string') + expect(notifySignature).toBeTypeOf('string') + + if (notifyTransloadit === null || notifySignature === null) { + throw new Error('Missing notify transloadit/signature payload') + } + + expect(notifySignature).toBe(getSignature(secret, notifyTransloadit)) + const payload = parseJsonRecord(notifyTransloadit) + expect(payload.error).toBe('ASSEMBLY_CRASHED') + } finally { + proxy.close() + await closeServer(notifyServer) + await closeServer(upstreamServer) + } + }, 10_000) +}) diff --git a/packages/notify-url-relay/test/chaos.test.ts b/packages/notify-url-relay/test/chaos.test.ts new file mode 100644 index 00000000..3ece394a --- /dev/null +++ b/packages/notify-url-relay/test/chaos.test.ts @@ -0,0 +1,217 @@ +import { createServer } from 'node:http' +import { setTimeout as delay } from 'node:timers/promises' + +import { describe, expect, it } from 'vitest' +import type { CounterMetricEvent } from '../src/index.ts' +import { TransloaditNotifyUrlProxy } from '../src/index.ts' +import { closeServer, json, listen, readBody, startRelay, waitFor } from './helpers.ts' + +describe('proxy chaos retries', () => { + it('handles flaky polling upstream and still notifies', async () => { + const counters: Record = {} + const onCounter = (event: CounterMetricEvent): void => { + counters[event.name] = event.total + } + + let upstreamPort = 0 + let pollCount = 0 + let notifyCount = 0 + let resolveNotify: (() => void) | null = null + const notifyReceived = new Promise((resolve) => { + resolveNotify = resolve + }) + + const notifyServer = createServer(async (request, response) => { + if (request.method !== 'POST' || request.url !== '/transloadit') { + response.writeHead(404) + response.end() + return + } + + notifyCount += 1 + await readBody(request) + response.writeHead(200) + response.end('ok') + resolveNotify?.() + }) + + const upstreamServer = createServer((request, response) => { + if (request.method === 'POST' && request.url === '/assemblies') { + json(response, 200, { + assembly_url: `http://127.0.0.1:${upstreamPort}/assembly/flaky`, + }) + return + } + + if (request.method === 'GET' && request.url === '/assembly/flaky') { + pollCount += 1 + if (pollCount <= 2) { + json(response, 500, { error: 'TEMP_ERROR' }) + return + } + if (pollCount <= 4) { + json(response, 200, { ok: 'ASSEMBLY_EXECUTING' }) + return + } + + json(response, 200, { ok: 'ASSEMBLY_COMPLETED', assembly_id: 'flaky' }) + return + } + + response.writeHead(404) + response.end() + }) + + const notifyPort = await listen(notifyServer) + upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy( + 'secret', + `http://127.0.0.1:${notifyPort}/transloadit`, + { + logLevel: 0, + metricsHooks: { onCounter }, + }, + ) + const proxyPort = await startRelay(proxy, { + target: `http://127.0.0.1:${upstreamPort}`, + pollIntervalMs: 10, + pollMaxIntervalMs: 50, + maxPollAttempts: 8, + pollRequestTimeoutMs: 300, + }) + + try { + const createResponse = await fetch(`http://127.0.0.1:${proxyPort}/assemblies`, { + method: 'POST', + body: new URLSearchParams({ params: '{}' }), + }) + expect(createResponse.status).toBe(200) + + await Promise.race([ + notifyReceived, + delay(5_000).then(() => { + throw new Error('Timed out waiting for flaky polling notify') + }), + ]) + await waitFor( + () => (counters['notify.success_total'] ?? 0) >= 1, + 1_000, + 10, + 'Timed out waiting for notify.success_total update', + ) + + expect(notifyCount).toBe(1) + expect(pollCount).toBeGreaterThanOrEqual(5) + expect(counters['poll.retry_total'] ?? 0).toBeGreaterThanOrEqual(3) + expect(counters['notify.success_total'] ?? 0).toBe(1) + } finally { + proxy.close() + await closeServer(notifyServer) + await closeServer(upstreamServer) + } + }, 12_000) + + it('retries flaky notify endpoint until success', async () => { + const counters: Record = {} + const onCounter = (event: CounterMetricEvent): void => { + counters[event.name] = event.total + } + + let upstreamPort = 0 + let notifyAttempts = 0 + let resolveNotify: (() => void) | null = null + const notifyDone = new Promise((resolve) => { + resolveNotify = resolve + }) + + const notifyServer = createServer(async (request, response) => { + if (request.method !== 'POST' || request.url !== '/transloadit') { + response.writeHead(404) + response.end() + return + } + + notifyAttempts += 1 + await readBody(request) + + if (notifyAttempts < 3) { + response.writeHead(500) + response.end('retry me') + return + } + + response.writeHead(200) + response.end('ok') + resolveNotify?.() + }) + + const upstreamServer = createServer((request, response) => { + if (request.method === 'POST' && request.url === '/assemblies') { + json(response, 200, { + assembly_url: `http://127.0.0.1:${upstreamPort}/assembly/notify-flaky`, + }) + return + } + + if (request.method === 'GET' && request.url === '/assembly/notify-flaky') { + json(response, 200, { ok: 'ASSEMBLY_COMPLETED', assembly_id: 'notify-flaky' }) + return + } + + response.writeHead(404) + response.end() + }) + + const notifyPort = await listen(notifyServer) + upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy( + 'secret', + `http://127.0.0.1:${notifyPort}/transloadit`, + { + logLevel: 0, + metricsHooks: { onCounter }, + }, + ) + const proxyPort = await startRelay(proxy, { + target: `http://127.0.0.1:${upstreamPort}`, + pollIntervalMs: 5, + pollMaxIntervalMs: 20, + notifyIntervalMs: 10, + notifyMaxIntervalMs: 40, + notifyBackoffFactor: 2, + notifyMaxAttempts: 5, + notifyTimeoutMs: 300, + }) + + try { + const createResponse = await fetch(`http://127.0.0.1:${proxyPort}/assemblies`, { + method: 'POST', + body: new URLSearchParams({ params: '{}' }), + }) + expect(createResponse.status).toBe(200) + + await Promise.race([ + notifyDone, + delay(5_000).then(() => { + throw new Error('Timed out waiting for flaky notify success') + }), + ]) + await waitFor( + () => (counters['notify.success_total'] ?? 0) >= 1, + 1_000, + 10, + 'Timed out waiting for notify.success_total update', + ) + + expect(notifyAttempts).toBe(3) + expect(counters['notify.retry_total'] ?? 0).toBeGreaterThanOrEqual(2) + expect(counters['notify.success_total'] ?? 0).toBe(1) + } finally { + proxy.close() + await closeServer(notifyServer) + await closeServer(upstreamServer) + } + }, 12_000) +}) diff --git a/packages/notify-url-relay/test/cli.test.ts b/packages/notify-url-relay/test/cli.test.ts new file mode 100644 index 00000000..ab5b2fbd --- /dev/null +++ b/packages/notify-url-relay/test/cli.test.ts @@ -0,0 +1,26 @@ +import { spawnSync } from 'node:child_process' +import { fileURLToPath } from 'node:url' + +import { describe, expect, it } from 'vitest' + +const CLI_PATH = fileURLToPath(new URL('../src/cli.ts', import.meta.url)) + +describe('cli', () => { + it('accepts bracketed IPv6 loopback notify URL over HTTP', () => { + const result = spawnSync( + process.execPath, + [CLI_PATH, '--help', '--notifyUrl', 'http://[::1]:3000/transloadit'], + { + encoding: 'utf8', + env: { + ...process.env, + TRANSLOADIT_SECRET: 'test_secret', + }, + }, + ) + + expect(result.status).toBe(0) + expect(result.stderr).toBe('') + expect(result.stdout).toContain('Usage: notify-url-relay [options]') + }) +}) diff --git a/packages/notify-url-relay/test/e2e.test.ts b/packages/notify-url-relay/test/e2e.test.ts new file mode 100644 index 00000000..7afeec64 --- /dev/null +++ b/packages/notify-url-relay/test/e2e.test.ts @@ -0,0 +1,105 @@ +import { createServer } from 'node:http' +import { setTimeout as delay } from 'node:timers/promises' + +import { describe, expect, it } from 'vitest' + +import { getSignature, TransloaditNotifyUrlProxy } from '../src/index.ts' +import { closeServer, json, listen, parseJsonRecord, readBody, startRelay } from './helpers.ts' + +describe('proxy e2e', () => { + it('proxies assembly creation, polls assembly status, and notifies target', async () => { + const secret = 'foo_secret' + let upstreamPort = 0 + let pollCount = 0 + + let notifyTransloadit: string | null = null + let notifySignature: string | null = null + let resolveNotify: (() => void) | null = null + const notifyReceived = new Promise((resolve) => { + resolveNotify = resolve + }) + + const notifyServer = createServer(async (request, response) => { + if (request.method !== 'POST' || request.url !== '/transloadit') { + response.writeHead(404) + response.end() + return + } + + const payload = new URLSearchParams(await readBody(request)) + notifyTransloadit = payload.get('transloadit') + notifySignature = payload.get('signature') + response.writeHead(200) + response.end('ok') + resolveNotify?.() + }) + + const upstreamServer = createServer((request, response) => { + if (request.method === 'POST' && request.url === '/assemblies') { + json(response, 200, { + assembly_url: `http://127.0.0.1:${upstreamPort}/assembly/123`, + }) + return + } + + if (request.method === 'GET' && request.url === '/assembly/123') { + pollCount += 1 + if (pollCount === 1) { + json(response, 200, { ok: 'ASSEMBLY_EXECUTING' }) + return + } + + json(response, 200, { ok: 'ASSEMBLY_COMPLETED', assembly_id: '123' }) + return + } + + response.writeHead(404) + response.end() + }) + + const notifyPort = await listen(notifyServer) + upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy( + secret, + `http://127.0.0.1:${notifyPort}/transloadit`, + ) + const proxyPort = await startRelay(proxy, { + target: `http://127.0.0.1:${upstreamPort}`, + pollIntervalMs: 5, + maxPollAttempts: 5, + }) + + try { + const createResponse = await fetch(`http://127.0.0.1:${proxyPort}/assemblies`, { + method: 'POST', + body: new URLSearchParams({ params: '{}' }), + }) + expect(createResponse.status).toBe(200) + + await Promise.race([ + notifyReceived, + delay(3_000).then(() => { + throw new Error('Timed out waiting for notify request') + }), + ]) + + expect(pollCount).toBe(2) + expect(notifyTransloadit).toBeTypeOf('string') + expect(notifySignature).toBeTypeOf('string') + + if (notifyTransloadit === null || notifySignature === null) { + throw new Error('Notify payload did not include transloadit + signature fields') + } + + expect(notifySignature).toBe(getSignature(secret, notifyTransloadit)) + + const body = parseJsonRecord(notifyTransloadit) + expect(body.ok).toBe('ASSEMBLY_COMPLETED') + } finally { + proxy.close() + await closeServer(notifyServer) + await closeServer(upstreamServer) + } + }, 10_000) +}) diff --git a/packages/notify-url-relay/test/helpers.ts b/packages/notify-url-relay/test/helpers.ts new file mode 100644 index 00000000..8319c309 --- /dev/null +++ b/packages/notify-url-relay/test/helpers.ts @@ -0,0 +1,103 @@ +import { once } from 'node:events' +import type { IncomingMessage, Server, ServerResponse } from 'node:http' +import { setTimeout as delay } from 'node:timers/promises' + +import type { ProxySettings } from '../src/index.ts' + +type RelayLike = { + run: (opts?: Partial) => void + waitForListenPort: () => Promise +} + +function isRecord(value: unknown): value is Record { + return typeof value === 'object' && value !== null && !Array.isArray(value) +} + +export async function readBody(request: IncomingMessage): Promise { + const chunks: Buffer[] = [] + for await (const chunk of request) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)) + } + + return Buffer.concat(chunks).toString('utf-8') +} + +export async function listen(server: Server): Promise { + server.listen(0, '127.0.0.1') + await once(server, 'listening') + const address = server.address() + if (address === null || typeof address === 'string') { + throw new Error('Could not resolve server address') + } + return address.port +} + +export async function closeServer(server: Server): Promise { + await new Promise((resolve) => { + server.close(() => resolve()) + }) +} + +export function startRelay( + relay: RelayLike, + settings: Omit, 'port'> = {}, +): Promise { + relay.run({ + ...settings, + port: 0, + }) + return relay.waitForListenPort() +} + +export function json(response: ServerResponse, statusCode: number, payload: unknown): void { + response.writeHead(statusCode, { 'content-type': 'application/json; charset=utf-8' }) + response.end(JSON.stringify(payload)) +} + +export function parseJsonRecord(value: string): Record { + const parsed: unknown = JSON.parse(value) + if (!isRecord(parsed)) { + throw new Error('Expected a JSON object payload.') + } + + return parsed +} + +export async function readJsonRecord(response: Response): Promise> { + const payload: unknown = await response.json() + if (!isRecord(payload)) { + throw new Error('Expected JSON response object.') + } + + return payload +} + +export function getSetCookieHeaders(headers: Headers): string[] { + const maybeGetSetCookie = Reflect.get(headers, 'getSetCookie') + if (typeof maybeGetSetCookie === 'function') { + const values = maybeGetSetCookie.call(headers) + if (Array.isArray(values)) { + return values.filter((value): value is string => typeof value === 'string') + } + } + + const fallback = headers.get('set-cookie') + return fallback ? [fallback] : [] +} + +export async function waitFor( + fn: () => boolean, + timeoutMs: number, + intervalMs = 10, + errorMessage = 'Timed out waiting for condition', +): Promise { + const deadline = Date.now() + timeoutMs + while (Date.now() < deadline) { + if (fn()) { + return + } + await delay(intervalMs) + } + + throw new Error(errorMessage) +} diff --git a/packages/notify-url-relay/test/index.test.ts b/packages/notify-url-relay/test/index.test.ts new file mode 100644 index 00000000..6b4827e8 --- /dev/null +++ b/packages/notify-url-relay/test/index.test.ts @@ -0,0 +1,50 @@ +import { describe, expect, it } from 'vitest' + +import { extractAssemblyUrl, getAssemblyState, getSignature } from '../src/index.ts' + +describe('getSignature', () => { + it('creates a sha384 prefixed hmac signature', () => { + const signature = getSignature('foo_secret', '{"ok":"ASSEMBLY_COMPLETED"}') + expect(signature).toBe( + 'sha384:bf26800c5256b38bbf9375c76894d5b649751903973f99d0f036c8e52f6cda287bed711b73c21dbde4d4df6c8fc540a1', + ) + }) +}) + +describe('extractAssemblyUrl', () => { + it('extracts assembly_url from proxy payload', () => { + expect(extractAssemblyUrl('{"assembly_url":"https://example.test/a/123"}')).toBe( + 'https://example.test/a/123', + ) + }) + + it('extracts assembly_ssl_url when present', () => { + expect(extractAssemblyUrl('{"assembly_ssl_url":"https://secure.example.test/a/123"}')).toBe( + 'https://secure.example.test/a/123', + ) + }) + + it('returns null for invalid payloads', () => { + expect(extractAssemblyUrl('nope')).toBeNull() + expect(extractAssemblyUrl('{"foo":"bar"}')).toBeNull() + }) +}) + +describe('getAssemblyState', () => { + it('accepts known states', () => { + expect(getAssemblyState({ ok: 'ASSEMBLY_COMPLETED' })).toBe('ASSEMBLY_COMPLETED') + expect(getAssemblyState({ ok: 'ASSEMBLY_CANCELED' })).toBe('ASSEMBLY_CANCELED') + expect(getAssemblyState({ ok: 'REQUEST_ABORTED' })).toBe('REQUEST_ABORTED') + expect(getAssemblyState({ ok: 'ASSEMBLY_UPLOADING' })).toBe('ASSEMBLY_UPLOADING') + expect(getAssemblyState({ ok: 'ASSEMBLY_EXECUTING' })).toBe('ASSEMBLY_EXECUTING') + expect(getAssemblyState({ ok: 'ASSEMBLY_REPLAYING' })).toBe('ASSEMBLY_REPLAYING') + }) + + it('rejects unknown states', () => { + expect(() => getAssemblyState({ ok: 'UNKNOWN' })).toThrow('Unknown Assembly state found') + }) + + it('rejects malformed payloads', () => { + expect(() => getAssemblyState(null)).toThrow('No ok field found') + }) +}) diff --git a/packages/notify-url-relay/test/network.test.ts b/packages/notify-url-relay/test/network.test.ts new file mode 100644 index 00000000..1f7fac33 --- /dev/null +++ b/packages/notify-url-relay/test/network.test.ts @@ -0,0 +1,247 @@ +import { createServer } from 'node:http' +import { setTimeout as delay } from 'node:timers/promises' +import { gzipSync } from 'node:zlib' + +import { describe, expect, it } from 'vitest' + +import { TransloaditNotifyUrlProxy } from '../src/index.ts' +import { + closeServer, + getSetCookieHeaders, + json, + listen, + readJsonRecord, + startRelay, +} from './helpers.ts' + +describe('proxy network behavior', () => { + it('streams large upstream response bodies', async () => { + const upstreamServer = createServer(async (request, response) => { + if (request.method !== 'GET' || request.url !== '/large') { + response.writeHead(404) + response.end() + return + } + + response.writeHead(200, { 'content-type': 'text/plain; charset=utf-8' }) + for (let i = 0; i < 256; i += 1) { + response.write(`chunk-${i.toString().padStart(3, '0')}-`) + await delay(1) + } + response.end('done') + }) + + const upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy('secret', undefined, { logLevel: 0 }) + const proxyPort = await startRelay(proxy, { target: `http://127.0.0.1:${upstreamPort}` }) + + try { + const response = await fetch(`http://127.0.0.1:${proxyPort}/large`) + const body = await response.text() + + expect(response.status).toBe(200) + expect(body.startsWith('chunk-000-')).toBe(true) + expect(body.endsWith('done')).toBe(true) + expect(body.length).toBeGreaterThan(2_000) + } finally { + proxy.close() + await closeServer(upstreamServer) + } + }, 10_000) + + it('passes redirects through without following them', async () => { + const upstreamServer = createServer((request, response) => { + if (request.url === '/redirect') { + response.writeHead(302, { location: '/final-destination' }) + response.end() + return + } + + response.writeHead(404) + response.end() + }) + + const upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy('secret', undefined, { logLevel: 0 }) + const proxyPort = await startRelay(proxy, { target: `http://127.0.0.1:${upstreamPort}` }) + + try { + const response = await fetch(`http://127.0.0.1:${proxyPort}/redirect`, { redirect: 'manual' }) + expect(response.status).toBe(302) + expect(response.headers.get('location')).toBe('/final-destination') + } finally { + proxy.close() + await closeServer(upstreamServer) + } + }) + + it('rejects protocol-relative request URLs that would override target host', async () => { + let attackerHit = false + + const targetServer = createServer((request, response) => { + response.writeHead(200, { 'content-type': 'text/plain; charset=utf-8' }) + response.end(`target:${request.url ?? '/'}`) + }) + const attackerServer = createServer((_, response) => { + attackerHit = true + response.writeHead(200, { 'content-type': 'text/plain; charset=utf-8' }) + response.end('attacker') + }) + + const targetPort = await listen(targetServer) + const attackerPort = await listen(attackerServer) + + const proxy = new TransloaditNotifyUrlProxy('secret', undefined, { logLevel: 0 }) + const proxyPort = await startRelay(proxy, { target: `http://127.0.0.1:${targetPort}` }) + + try { + const response = await fetch( + `http://127.0.0.1:${proxyPort}//127.0.0.1:${attackerPort}/escaped-host`, + ) + const payload = await readJsonRecord(response) + + expect(response.status).toBe(502) + expect(response.headers.get('x-notify-proxy-error-code')).toBe('FORWARD_UPSTREAM_ERROR') + expect(payload.error).toBe('FORWARD_UPSTREAM_ERROR') + expect(attackerHit).toBe(false) + } finally { + proxy.close() + await closeServer(targetServer) + await closeServer(attackerServer) + } + }) + + it('passes through multiple set-cookie headers', async () => { + const upstreamServer = createServer((request, response) => { + if (request.url === '/cookies') { + response.writeHead(200, { + 'set-cookie': ['a=1; Path=/', 'b=2; Path=/'], + 'content-type': 'text/plain; charset=utf-8', + }) + response.end('ok') + return + } + + response.writeHead(404) + response.end() + }) + + const upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy('secret', undefined, { logLevel: 0 }) + const proxyPort = await startRelay(proxy, { target: `http://127.0.0.1:${upstreamPort}` }) + + try { + const response = await fetch(`http://127.0.0.1:${proxyPort}/cookies`) + expect(response.status).toBe(200) + + const cookies = getSetCookieHeaders(response.headers) + + expect(cookies).toEqual(['a=1; Path=/', 'b=2; Path=/']) + } finally { + proxy.close() + await closeServer(upstreamServer) + } + }) + + it('drops encoding headers when fetch decodes compressed upstream payloads', async () => { + const payload = JSON.stringify({ + ok: true, + body: 'x'.repeat(1024), + }) + const compressed = gzipSync(payload) + let forwardedAcceptEncoding: string | undefined + + const upstreamServer = createServer((request, response) => { + if (request.url === '/compressed') { + forwardedAcceptEncoding = + typeof request.headers['accept-encoding'] === 'string' + ? request.headers['accept-encoding'] + : undefined + response.writeHead(200, { + 'content-type': 'application/json; charset=utf-8', + 'content-encoding': 'gzip', + 'content-length': String(compressed.length), + }) + response.end(compressed) + return + } + + response.writeHead(404) + response.end() + }) + + const upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy('secret', undefined, { logLevel: 0 }) + const proxyPort = await startRelay(proxy, { target: `http://127.0.0.1:${upstreamPort}` }) + + try { + const response = await fetch(`http://127.0.0.1:${proxyPort}/compressed`) + const body = await response.text() + + expect(response.status).toBe(200) + expect(body).toBe(payload) + expect(response.headers.get('content-encoding')).toBeNull() + expect(response.headers.get('content-length')).toBeNull() + expect(forwardedAcceptEncoding).toBe('identity') + } finally { + proxy.close() + await closeServer(upstreamServer) + } + }) + + it('returns timeout code when upstream exceeds forward timeout', async () => { + const upstreamServer = createServer(async (request, response) => { + if (request.url === '/slow') { + await delay(200) + json(response, 200, { ok: true }) + return + } + + response.writeHead(404) + response.end() + }) + + const upstreamPort = await listen(upstreamServer) + + const proxy = new TransloaditNotifyUrlProxy('secret', undefined, { logLevel: 0 }) + const proxyPort = await startRelay(proxy, { + target: `http://127.0.0.1:${upstreamPort}`, + forwardTimeoutMs: 30, + }) + + try { + const response = await fetch(`http://127.0.0.1:${proxyPort}/slow`) + const payload = await readJsonRecord(response) + + expect(response.status).toBe(504) + expect(response.headers.get('x-notify-proxy-error-code')).toBe('FORWARD_TIMEOUT') + expect(payload.error).toBe('FORWARD_TIMEOUT') + } finally { + proxy.close() + await closeServer(upstreamServer) + } + }) + + it('returns upstream-error code when target cannot be reached', async () => { + const proxy = new TransloaditNotifyUrlProxy('secret', undefined, { logLevel: 0 }) + const proxyPort = await startRelay(proxy, { + target: 'http://127.0.0.1:1', + forwardTimeoutMs: 200, + }) + + try { + const response = await fetch(`http://127.0.0.1:${proxyPort}/unreachable`) + const payload = await readJsonRecord(response) + + expect(response.status).toBe(502) + expect(response.headers.get('x-notify-proxy-error-code')).toBe('FORWARD_UPSTREAM_ERROR') + expect(payload.error).toBe('FORWARD_UPSTREAM_ERROR') + } finally { + proxy.close() + } + }) +}) diff --git a/packages/notify-url-relay/test/real.e2e.test.ts b/packages/notify-url-relay/test/real.e2e.test.ts new file mode 100644 index 00000000..13f89272 --- /dev/null +++ b/packages/notify-url-relay/test/real.e2e.test.ts @@ -0,0 +1,123 @@ +import { existsSync } from 'node:fs' +import { createServer } from 'node:http' +import { setTimeout as delay } from 'node:timers/promises' +import { Transloadit } from 'transloadit' +import { describe, expect, it } from 'vitest' + +import { getSignature, TransloaditNotifyUrlProxy } from '../src/index.ts' +import { closeServer, listen, parseJsonRecord, readBody, startRelay } from './helpers.ts' + +if (typeof process.loadEnvFile === 'function' && existsSync('.env')) { + process.loadEnvFile('.env') +} + +const runReal = + process.env.RUN_REAL_E2E === '1' && + typeof process.env.TRANSLOADIT_KEY === 'string' && + process.env.TRANSLOADIT_KEY.length > 0 && + typeof process.env.TRANSLOADIT_SECRET === 'string' && + process.env.TRANSLOADIT_SECRET.length > 0 + +const describeReal = runReal ? describe : describe.skip + +function requireEnv(name: 'TRANSLOADIT_SECRET' | 'TRANSLOADIT_KEY'): string { + const value = process.env[name] + if (!value) { + throw new Error(`Missing required environment variable ${name}.`) + } + + return value +} + +describeReal('real api e2e', () => { + it('creates a real assembly through the proxy and receives signed notify callback', async () => { + const secret = requireEnv('TRANSLOADIT_SECRET') + const authKey = requireEnv('TRANSLOADIT_KEY') + const endpoint = (process.env.TRANSLOADIT_ENDPOINT || 'https://api2.transloadit.com').replace( + /\/$/, + '', + ) + + let notifyTransloadit: string | null = null + let notifySignature: string | null = null + let resolveNotify: (() => void) | null = null + const notifyReceived = new Promise((resolve) => { + resolveNotify = resolve + }) + + const notifyServer = createServer(async (request, response) => { + if (request.method !== 'POST' || request.url !== '/transloadit') { + response.writeHead(404) + response.end() + return + } + + const payload = new URLSearchParams(await readBody(request)) + notifyTransloadit = payload.get('transloadit') + notifySignature = payload.get('signature') + + response.writeHead(200) + response.end('ok') + resolveNotify?.() + }) + + const notifyPort = await listen(notifyServer) + + const proxy = new TransloaditNotifyUrlProxy( + secret, + `http://127.0.0.1:${notifyPort}/transloadit`, + ) + const proxyPort = await startRelay(proxy, { + target: endpoint, + pollIntervalMs: 1_000, + maxPollAttempts: 120, + }) + + const client = new Transloadit({ + authKey, + authSecret: secret, + endpoint: `http://127.0.0.1:${proxyPort}`, + }) + + try { + const createPromise = client.createAssembly({ + uploads: { + probe: Buffer.from(`notify-url-relay-real-e2e-${Date.now()}`, 'utf-8'), + }, + params: { + steps: { + ':original': { robot: '/upload/handle' }, + }, + }, + waitForCompletion: false, + timeout: 120_000, + }) + const createdAssemblyId = createPromise.assemblyId + expect(typeof createdAssemblyId).toBe('string') + expect(createdAssemblyId.length).toBeGreaterThan(0) + await createPromise + + await Promise.race([ + notifyReceived, + delay(180_000).then(() => { + throw new Error('Timed out waiting for notify callback from proxy') + }), + ]) + + expect(notifyTransloadit).toBeTypeOf('string') + expect(notifySignature).toBeTypeOf('string') + if (notifyTransloadit === null || notifySignature === null) { + throw new Error('Notify payload did not include transloadit + signature fields') + } + + expect(notifySignature).toBe(getSignature(secret, notifyTransloadit)) + + const payload = parseJsonRecord(notifyTransloadit) + expect(payload.assembly_id).toBe(createdAssemblyId) + expect(payload.ok).toBe('ASSEMBLY_COMPLETED') + } finally { + proxy.close() + await closeServer(notifyServer) + } + }, 210_000) +}) diff --git a/packages/notify-url-relay/tsconfig.build.json b/packages/notify-url-relay/tsconfig.build.json new file mode 100644 index 00000000..6b07e33b --- /dev/null +++ b/packages/notify-url-relay/tsconfig.build.json @@ -0,0 +1,21 @@ +{ + "include": ["src"], + "exclude": ["test", "coverage", "dist"], + "compilerOptions": { + "composite": true, + "declaration": true, + "declarationMap": true, + "erasableSyntaxOnly": true, + "isolatedModules": true, + "module": "NodeNext", + "allowImportingTsExtensions": true, + "target": "ES2022", + "noImplicitOverride": true, + "rewriteRelativeImportExtensions": true, + "outDir": "dist", + "resolveJsonModule": true, + "rootDir": "src", + "sourceMap": true, + "strict": true + } +} diff --git a/packages/notify-url-relay/tsconfig.json b/packages/notify-url-relay/tsconfig.json new file mode 100644 index 00000000..59824675 --- /dev/null +++ b/packages/notify-url-relay/tsconfig.json @@ -0,0 +1,16 @@ +{ + "exclude": ["dist", "src", "coverage"], + "references": [{ "path": "./tsconfig.build.json" }], + "compilerOptions": { + "checkJs": true, + "erasableSyntaxOnly": true, + "isolatedModules": true, + "module": "NodeNext", + "allowImportingTsExtensions": true, + "noImplicitOverride": true, + "noEmit": true, + "resolveJsonModule": true, + "strict": true, + "types": ["vitest/globals"] + } +} diff --git a/packages/notify-url-relay/vitest.config.ts b/packages/notify-url-relay/vitest.config.ts new file mode 100644 index 00000000..55d181b3 --- /dev/null +++ b/packages/notify-url-relay/vitest.config.ts @@ -0,0 +1,16 @@ +import { fileURLToPath } from 'node:url' +import { defineConfig } from 'vitest/config' + +const includeRealE2E = process.env.RUN_REAL_E2E === '1' +const transloaditPath = fileURLToPath(new URL('../node/src/Transloadit.ts', import.meta.url)) + +export default defineConfig({ + resolve: { + alias: [{ find: /^transloadit$/, replacement: transloaditPath }], + }, + test: { + environment: 'node', + include: ['test/**/*.test.ts'], + exclude: includeRealE2E ? [] : ['test/real.e2e.test.ts'], + }, +}) diff --git a/tsconfig.json b/tsconfig.json index 6187214f..d5aedc09 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -6,6 +6,7 @@ }, "references": [ { "path": "./packages/node" }, + { "path": "./packages/notify-url-relay" }, { "path": "./packages/types" }, { "path": "./packages/utils" }, { "path": "./packages/zod" } diff --git a/yarn.lock b/yarn.lock index 24b6b6bf..9ebd10ac 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2581,6 +2581,22 @@ __metadata: languageName: unknown linkType: soft +"@transloadit/notify-url-relay@workspace:packages/notify-url-relay": + version: 0.0.0-use.local + resolution: "@transloadit/notify-url-relay@workspace:packages/notify-url-relay" + dependencies: + "@transloadit/sev-logger": "npm:^0.1.9" + "@transloadit/utils": "npm:^4.3.0" + "@transloadit/zod": "npm:^4.3.0" + "@types/node": "npm:^24.10.3" + dotenv: "npm:^17.2.3" + p-retry: "npm:^7.1.1" + transloadit: "npm:^4.7.4" + bin: + notify-url-relay: ./dist/cli.js + languageName: unknown + linkType: soft + "@transloadit/sev-logger@npm:^0.1.9": version: 0.1.9 resolution: "@transloadit/sev-logger@npm:0.1.9" @@ -2604,7 +2620,7 @@ __metadata: languageName: unknown linkType: soft -"@transloadit/zod@workspace:packages/zod": +"@transloadit/zod@npm:^4.3.0, @transloadit/zod@workspace:packages/zod": version: 0.0.0-use.local resolution: "@transloadit/zod@workspace:packages/zod" dependencies: @@ -6271,6 +6287,15 @@ __metadata: languageName: node linkType: hard +"p-retry@npm:^7.1.1": + version: 7.1.1 + resolution: "p-retry@npm:7.1.1" + dependencies: + is-network-error: "npm:^1.1.0" + checksum: 10c0/d72fb15dace25b8bf72c97a13c8a630ad1deb4667e708955e8806ee38f1d70e9611598ebe57bd9677349256e024a5599292f99aabb203143e0e13f1735e30818 + languageName: node + linkType: hard + "p-timeout@npm:^7.0.0": version: 7.0.1 resolution: "p-timeout@npm:7.0.1" @@ -7627,7 +7652,7 @@ __metadata: languageName: unknown linkType: soft -"transloadit@workspace:packages/transloadit": +"transloadit@npm:^4.7.4, transloadit@workspace:packages/transloadit": version: 0.0.0-use.local resolution: "transloadit@workspace:packages/transloadit" dependencies: