Skip to content

Commit

Permalink
Call "onStepFinished" on error, and send error as parameter to callback
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrokehl committed Mar 13, 2024
1 parent 2d6eb84 commit 29d48c2
Show file tree
Hide file tree
Showing 12 changed files with 248 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"no-plusplus": "off",
"no-underscore-dangle": ["error"],
"no-await-in-loop": "off",
"complexity": ["error", { "max": 4 }],
"complexity": ["error", { "max": 5 }],
"class-methods-use-this": "off",
"@typescript-eslint/consistent-type-imports": ["error", { "fixStyle": "inline-type-imports" }]
},
Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ npm run test:watch

## Roadmap

- Allow a provided callback that receives the bag for generating the seed of `reduce`.
- Wrap steps in a try catch so we can call logger with the step error.
- Write test for “maxItemsFlowing” in combination with “reduce” + following pipes
- Better wrap filter and add logging to it
- Use “wrappedGenerator” in “wrappedGeneratorWithBackPressure” to remove duplicates
- Proper typing on ValueBag and how it's handled in child steps.
13 changes: 9 additions & 4 deletions src/operators/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ export function batch(params: BatchParams, loggers: Loggers): OperatorApplier {
async function wrappedStep(valueBag: ValueBag[]): Promise<ValueBag[]> {
loggers.onStepStarted(valueBag)
const startTime = new Date()
const values = await params.fn([...valueBag])
const newValueBags = getBag(valueBag, values)
loggers.onStepFinished(newValueBags, startTime)
return newValueBags
try {
const values = await params.fn([...valueBag])
const newValueBags = getBag(valueBag, values)
loggers.onStepFinished(newValueBags, startTime)
return newValueBags
} catch (err) {
loggers.onStepFinished(valueBag, startTime, err as Error)
throw err
}
}

return function operatorApplier(observable: Observable<ValueBag>) {
Expand Down
51 changes: 31 additions & 20 deletions src/operators/generator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,21 @@ export function wrapGenerator(generatorParams: FromGeneratorParams, loggers: Log
loggers.onStepStarted(bagArrayForLogger)
let isStart = true
let startTime = new Date()
for await (const value of generatorParams.fn(initialBag)) {
if (!isStart) {
loggers.onStepStarted(bagArrayForLogger)

try {
for await (const value of generatorParams.fn(initialBag)) {
if (!isStart) {
loggers.onStepStarted(bagArrayForLogger)
}
isStart = false
const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value)
loggers.onStepFinished([newValueBag], startTime)
yield newValueBag
startTime = new Date()
}
isStart = false
const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value)
loggers.onStepFinished([newValueBag], startTime)
yield newValueBag
startTime = new Date()
} catch (err) {
loggers.onStepFinished([initialBag], startTime, err as Error)
throw err
}
}
}
Expand All @@ -36,19 +42,24 @@ export function wrapGeneratorWithBackPressure(
loggers.onStepStarted(bagArrayForLogger)
let isStart = true
let startTime = new Date()
for await (const value of generatorParams.fn(initialBag)) {
if (!isStart) {
loggers.onStepStarted(bagArrayForLogger)
}
isStart = false
const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value)
if (needsToWaitForBackpressure(pendingDataControl, maxItemsFlowing)) {
await waitOnBackpressure(maxItemsFlowing, pendingDataControl)
try {
for await (const value of generatorParams.fn(initialBag)) {
if (!isStart) {
loggers.onStepStarted(bagArrayForLogger)
}
isStart = false
const newValueBag = getNewValueBag(initialBag, generatorParams.provides, value)
if (needsToWaitForBackpressure(pendingDataControl, maxItemsFlowing)) {
await waitOnBackpressure(maxItemsFlowing, pendingDataControl)
}
pendingDataControl.increment()
loggers.onStepFinished([newValueBag], startTime)
yield newValueBag
startTime = new Date()
}
pendingDataControl.increment()
loggers.onStepFinished([newValueBag], startTime)
yield newValueBag
startTime = new Date()
} catch (err) {
loggers.onStepFinished([initialBag], startTime, err as Error)
throw err
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions src/operators/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,15 @@ export function pipe(params: PipeParams, loggers: Loggers): OperatorApplier {
async function wrappedStep(valueBag: ValueBag): Promise<ValueBag> {
loggers.onStepStarted([valueBag])
const startTime = new Date()
const value = await params.fn({ ...valueBag })
const newBag = getBag(valueBag, value)
loggers.onStepFinished([newBag], startTime)
return newBag
try {
const value = await params.fn({ ...valueBag })
const newBag = getBag(valueBag, value)
loggers.onStepFinished([newBag], startTime)
return newBag
} catch (err) {
loggers.onStepFinished([valueBag], startTime, err as Error)
throw err
}
}
return mergeMap(wrappedStep, params?.maxConcurrency)
}
Expand Down
16 changes: 11 additions & 5 deletions src/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@ export function reduce<T>(
function wrappedReduce(acc: T, valueBag: ValueBag, index: number): T {
const startedAt = new Date()
loggers.onStepStarted([valueBag])
const reduceResult = reduceParams.fn(acc, valueBag, index)
pendingDataControl?.decrement()
loggers.onStepFinished([valueBag], startedAt)
lastBag = valueBag
return reduceResult
try {
const reduceResult = reduceParams.fn(acc, valueBag, index)
loggers.onStepFinished([valueBag], startedAt)
lastBag = valueBag
return reduceResult
} catch (err) {
loggers.onStepFinished([valueBag], startedAt, err as Error)
throw err
} finally {
pendingDataControl?.decrement()
}
}

return function operatorApplier(observable: Observable<ValueBag>) {
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export type OnStepFinishedParams = {
tookMs: number
emitted: number
valueBags: ValueBag[]
error?: Error
}

export type Loggers = { onStepFinished: InternalOnStepFinished; onStepStarted: InternalOnStepStarted }
Expand Down
8 changes: 4 additions & 4 deletions src/utils/onStepFinished.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import type { OnStepFinished, ValueBag } from '../types'
import { type OnStepFinished, type ValueBag } from '../types'

export type InternalOnStepFinished = (valueBags: ValueBag[], stepStartedAt: Date) => void
export type InternalOnStepFinished = (valueBags: ValueBag[], stepStartedAt: Date, error?: Error) => void

const stub = () => {}

export function getOnStepFinished(name: string, onStepFinished?: OnStepFinished): InternalOnStepFinished {
if (onStepFinished) {
return function internalOnStepFinished(valueBags: ValueBag[], stepStartedAt: Date) {
return function internalOnStepFinished(valueBags: ValueBag[], stepStartedAt: Date, error?: Error) {
const now = Date.now()
const tookMs = now - stepStartedAt.getTime()
onStepFinished({ name, tookMs, valueBags, emitted: valueBags.length })
onStepFinished({ name, tookMs, valueBags, emitted: valueBags.length, error })
}
}
return stub
Expand Down
49 changes: 33 additions & 16 deletions test/integration/error.test.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,59 @@
import { fromGenerator } from '../../src'
import { getMockedGenerator } from '../mocks/generator.mock'
import { getMockedGenerator, getThrowingGenerator } from '../mocks/generator.mock'

describe('Error Handling', () => {
test('Should pass generator error to run call stack', () => {
// eslint-disable-next-line require-yield
async function* generator(): AsyncGenerator<number> {
throw new Error('Generator error')
}

const caminho = fromGenerator({ fn: generator, provides: 'generatedData' })
.pipe({ fn: jest.fn() })
test('Should pass "generator" error to run call stack', async () => {
const caminho = fromGenerator({ fn: getThrowingGenerator(new Error('Generator error')), provides: 'generatedData' })
await expect(caminho.run()).rejects.toMatchObject({ message: 'Generator error' })
})

return expect(caminho.run()).rejects.toMatchObject({ message: 'Generator error' })
test('Should pass "generator with backpressure" error to run call stack', async () => {
const onStepFinished = jest.fn()
const options = { maxItemsFlowing: 1, onStepFinished }
const throwingGenerator = getThrowingGenerator(new Error('Generator error'))
const caminho = fromGenerator({ fn: throwingGenerator, provides: 'generatedData' }, options)
await expect(caminho.run()).rejects.toMatchObject({ message: 'Generator error' })
})

test('Should pass operator error to run call stack', () => {
test('Should pass "pipe" error to run call stack', async () => {
const operator = jest.fn().mockRejectedValue(new Error('Operator error'))
const caminho = fromGenerator({ fn: getMockedGenerator([1, 2]), provides: 'number' })
.pipe({ fn: operator })

return expect(caminho.run()).rejects.toMatchObject({ message: 'Operator error' })
await expect(caminho.run()).rejects.toMatchObject({ message: 'Operator error' })
})

test('Should pass "batch" error to run call stack', async () => {
const operator = jest.fn().mockRejectedValue(new Error('Operator error'))
const caminho = fromGenerator({ fn: getMockedGenerator([1, 2]), provides: 'number' })
.pipe({ fn: operator, batch: { maxSize: 10, timeoutMs: 1 } })

await expect(caminho.run()).rejects.toMatchObject({ message: 'Operator error' })
})

test('Should pass "parallel" error to run call stack', async () => {
const operator = jest.fn().mockRejectedValue(new Error('Operator error'))
const caminho = fromGenerator({ fn: getMockedGenerator([1, 2]), provides: 'number' })
.parallel([{ fn: operator }])

await expect(caminho.run()).rejects.toMatchObject({ message: 'Operator error' })
})

test('Should pass filter error to run call stack', () => {
test('Should pass "filter" error to run call stack', async () => {
const caminho = fromGenerator({ fn: getMockedGenerator([1, 2]), provides: 'number' })
.filter(() => { throw new Error('Filter error') })

return expect(caminho.run()).rejects.toMatchObject({ message: 'Filter error' })
await expect(caminho.run()).rejects.toMatchObject({ message: 'Filter error' })
})

test('Should pass reduce error to run call stack', () => {
test('Should pass "reduce" error to run call stack', async () => {
const caminho = fromGenerator({ fn: getMockedGenerator([1, 2]), provides: 'number' })
.reduce({
fn: () => { throw new Error('Reduce error') },
seed: 0,
provides: 'doesntMatter',
})

return expect(caminho.run()).rejects.toMatchObject({ message: 'Reduce error' })
await expect(caminho.run()).rejects.toMatchObject({ message: 'Reduce error' })
})
})
128 changes: 127 additions & 1 deletion test/integration/onStepFinished.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { fromGenerator, fromFn } from '../../src'

import { getMockedJobGenerator } from '../mocks/generator.mock'
import { getGeneratorThrowsAfterThYields, getMockedJobGenerator, getThrowingGenerator } from '../mocks/generator.mock'
import { getOnStepFinishedParamsFixture } from '../mocks/stepResult.mock'

describe('onStepFinished', () => {
Expand Down Expand Up @@ -57,4 +57,130 @@ describe('onStepFinished', () => {
await fromFn({ fn: getJob, provides: 'job' }, { onStepFinished: onStepFinishedMock }).run()
expect(onStepFinishedMock.mock.calls).toEqual([[getOnStepFinishedParamsFixture({ name: 'getJob', emitted: 1 })]])
})

describe('with error', () => {
it('forwards the error from .pipe fn to onStepFinished and stops the flow', async () => {
const error = new Error('fetchError')
const generatorMock = getMockedJobGenerator(1)
const fetchMock = jest.fn().mockRejectedValue(error)
const saveMock = jest.fn()
const onStepFinished = jest.fn().mockName('onStepFinishedLog')

const flow = fromGenerator({ name: 'generator', fn: generatorMock, provides: 'job' }, { onStepFinished })
.pipe({ name: 'fetchSomething', fn: fetchMock, provides: 'rawData', maxConcurrency: 5 })
.pipe({ fn: saveMock })

await expect(flow.run).rejects.toThrow(error)
expect(onStepFinished.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator' })],
[getOnStepFinishedParamsFixture({ name: 'fetchSomething', error })],
])
})

it('forwards the error from BATCHED .pipe fn to onStepFinished and stops the flow', async () => {
const error = new Error('fetchError')
const generatorMock = getMockedJobGenerator(1)
const fetchMock = jest.fn().mockRejectedValue(error)
const saveMock = jest.fn()
const onStepFinished = jest.fn().mockName('onStepFinishedLog')

const flow = fromGenerator({ name: 'generator', fn: generatorMock, provides: 'job' }, { onStepFinished })
.pipe({ name: 'batchSomething', fn: fetchMock, batch: { maxSize: 1, timeoutMs: 10 } })
.pipe({ fn: saveMock })

await expect(flow.run).rejects.toThrow(error)
expect(onStepFinished.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator' })],
[getOnStepFinishedParamsFixture({ name: 'batchSomething', error })],
])
})

it('forwards the error from .parallel fn to onStepFinished and stops the flow', async () => {
const error = new Error('fetchError')
const generatorMock = getMockedJobGenerator(1)
const fetchMock = jest.fn().mockRejectedValue(error)
const saveMock = jest.fn()
const onStepFinished = jest.fn().mockName('onStepFinishedLog')

const flow = fromGenerator({ name: 'generator', fn: generatorMock, provides: 'job' }, { onStepFinished })
.parallel([{ name: 'batchSomething', fn: fetchMock, batch: { maxSize: 1, timeoutMs: 10 } }])
.pipe({ fn: saveMock })

await expect(flow.run).rejects.toThrow(error)
expect(onStepFinished.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator' })],
[getOnStepFinishedParamsFixture({ name: 'batchSomething', error })],
])
})

it('forwards the error from .reduce fn to onStepFinished and stops the flow', async () => {
const error = new Error('reduceError')
const generatorMock = getMockedJobGenerator(1)
const reduceMock = jest.fn().mockImplementation(() => { throw error })
const saveMock = jest.fn()
const onStepFinished = jest.fn().mockName('onStepFinishedLog')

const flow = fromGenerator({ name: 'generator', fn: generatorMock, provides: 'job' }, { onStepFinished })
.reduce({ name: 'reduceSomething', fn: reduceMock, provides: 'acc', seed: 0 })
.pipe({ fn: saveMock })

await expect(flow.run).rejects.toThrow(error)
expect(onStepFinished.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator' })],
[getOnStepFinishedParamsFixture({ name: 'reduceSomething', error })],
])
})

it('forwards the error from generator to onStepFinished and stops the flow', async () => {
const error = new Error('generator error mock')
const generatorMock = getThrowingGenerator(error)
const reduceMock = jest.fn()
const saveMock = jest.fn()
const onStepFinished = jest.fn().mockName('onStepFinishedLog')

const flow = fromGenerator({ name: 'generator', fn: generatorMock, provides: 'job' }, { onStepFinished })
.reduce({ name: 'reduceSomething', fn: reduceMock, provides: 'acc', seed: 0 })
.pipe({ fn: saveMock })

await expect(flow.run).rejects.toThrow(error)
expect(onStepFinished.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator', error })],
])
})

it('forwards the error from generator to onStepFinished and stops the flow after few yields worked', async () => {
const error = new Error('generator error mock')
const generatorMock = getGeneratorThrowsAfterThYields(error, 2)
const saveMock = jest.fn()
const onStepFinished = jest.fn().mockName('onStepFinishedLog')

const flow = fromGenerator({ name: 'generator', fn: generatorMock, provides: 'item' }, { onStepFinished })
.pipe({ fn: saveMock, name: 'saveMock' })

await expect(flow.run).rejects.toThrow(error)
expect(onStepFinished.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator' })],
[getOnStepFinishedParamsFixture({ name: 'saveMock' })],
[getOnStepFinishedParamsFixture({ name: 'generator' })],
[getOnStepFinishedParamsFixture({ name: 'saveMock' })],
[getOnStepFinishedParamsFixture({ name: 'generator', error })],
])
})

it('forwards the error from .from fn with BACKPRESSURE to onStepFinished and stops the flow', async () => {
const error = new Error('generator error mock')
const generatorMock = getThrowingGenerator(error)
const saveMock = jest.fn()
const onStepFinished = jest.fn().mockName('onStepFinishedLog')
const options = { onStepFinished, maxItemsFlowing: 1 }

const flow = fromGenerator({ name: 'generator', fn: generatorMock, provides: 'job' }, options)
.pipe({ fn: saveMock })

await expect(flow.run).rejects.toThrow(error)
expect(onStepFinished.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator', error })],
])
})
})
})
Loading

0 comments on commit 29d48c2

Please sign in to comment.