Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the "keepLastValues" parameter of run #39

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ deno add @pedrokehl/caminho
*After the steps definition, execute your Caminho flow by calling `.run()`.*

`run`: Returns a Promise which is fulfilled when the Generator has finished providing values and all the items have been processed by all the defined steps in the Caminho flow.
The function takes two parameters:
- `initialValueBag: ValueBag`: An initial valueBag, which is passed through all steps.
- `pickLastValues: string[]`: List of properties you want to be returned by run execution, only last values are executed, useful mainly for data that got aggregated with reduce.
The function takes an initial valueBag as parameter, which is passed to the child steps, and the returned value from the promise is an object that contains the context of the last execution of the flow.

Simple flow:

Expand Down Expand Up @@ -151,7 +149,7 @@ To use it, call `reduce()` with the following properties:
- `fn: (acc: A, value: ValueBag, index: number) => A`, Similar to a callback provided to [Array.reduce](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/reduce).
- `seed: A`: Defines the initial `acc` value received on your aggregator function.
- `provides: string`: The property name to be appended to the valueBag with the value returned from the reducer.
- `keep: string[]`: List of the properties that you want to keep the last known value in the valueBag for following steps, useful for child flows.
- `keep: string[]`: Optional list of the properties that you want to keep the last known value in the valueBag for following steps, by default it keeps only the reduce result.

```typescript
function sumPrice(acc: number, item: ValueBag) {
Expand All @@ -162,7 +160,7 @@ const result = await fromGenerator({ fn: generateCars, provides: 'carId' })
.pipe({ fn: fetchPrice, provides: 'price' })
.reduce({ fn: sumPrice, seed: 0, provides: 'sum', keep: ['manufacturer'] })
.pipe( { fn: saveTotalForManufacturer })
.run({ manufacturer: 'Mazda' }, ['sum', 'manufacturer'])
.run({ manufacturer: 'Mazda' })

console.log('result', result)
// result { "sum": 1_532_600, "manufacturer": "Mazda" }
Expand Down
4 changes: 2 additions & 2 deletions benchmark/parallel.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async function runParallelBenchmark(parentItems: number, childItemsPerParent: nu
console.time('initialize caminho')
const benchmarkCaminho = fromGenerator(steps.parentGenerator, { maxItemsFlowing: 1_000 })
.parallel([steps.pipe1, steps.pipe2])
.pipe({ fn: childCaminho.run.bind(null, {}, ['count']), provides: 'child' })
.pipe({ fn: childCaminho.run, provides: 'child' })
.reduce({
fn: (acc: number, bag: ValueBag): number => acc + bag.child.count,
seed: 0,
Expand All @@ -28,7 +28,7 @@ async function runParallelBenchmark(parentItems: number, childItemsPerParent: nu
console.timeEnd('initialize caminho')

console.time('run caminho')
const { count: childProcessed } = await benchmarkCaminho.run({}, ['count']) as { count: number }
const { count: childProcessed } = await benchmarkCaminho.run()
console.timeEnd('run caminho')

if (childProcessed !== expectedTotalChild) {
Expand Down
4 changes: 2 additions & 2 deletions benchmark/subFlow.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async function runSubflowBenchmark(parentItems: number, childItemsPerParent: num

const parentCaminho = fromGenerator(steps.parentGenerator, { maxItemsFlowing: 1_000 })
.pipe({ fn: steps.pipeFn })
.pipe({ fn: (bag: ValueBag) => childCaminho.run(bag, ['count']), provides: 'child' })
.pipe({ fn: childCaminho.run, provides: 'child' })
.reduce({
fn: (acc: number, bag: ValueBag) => acc + bag.child.count,
seed: 0,
Expand All @@ -28,7 +28,7 @@ async function runSubflowBenchmark(parentItems: number, childItemsPerParent: num
console.timeEnd('initialize caminho')

console.time('run caminho')
const { count: childProcessed } = await parentCaminho.run({}, ['count']) as { count: number }
const { count: childProcessed } = await parentCaminho.run()
console.timeEnd('run caminho')

if (childProcessed !== expectedTotalChild) {
Expand Down
8 changes: 2 additions & 6 deletions src/Caminho.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { PendingDataControl, PendingDataControlInMemory } from './utils/PendingD

import { getOnStepFinished } from './utils/onStepFinished'
import { getOnStepStarted } from './utils/onStepStarted'
import { pick } from './utils/pick'

export class Caminho implements CaminhoInterface {
private generator: (initialBag: ValueBag) => AsyncGenerator<ValueBag>
Expand Down Expand Up @@ -59,12 +58,9 @@ export class Caminho implements CaminhoInterface {
return this
}

public run(initialBag: ValueBag, pickLastValues: string[]): Promise<unknown>
public run(initialBag?: ValueBag): Promise<undefined>
public async run(initialBag?: ValueBag, pickLastValues?: string[]): Promise<unknown | undefined> {
public async run(initialBag?: ValueBag): Promise<ValueBag> {
const observable$ = this.buildObservable(initialBag)
const lastValue = await lastValueFrom(observable$, { defaultValue: undefined })
return pickLastValues && lastValue ? pick(lastValue, pickLastValues) : undefined
return lastValueFrom(observable$, { defaultValue: initialBag })
}

private getGenerator(generatorParams: FromGeneratorParams): (initialBag: ValueBag) => AsyncGenerator<ValueBag> {
Expand Down
9 changes: 4 additions & 5 deletions src/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,22 @@ export function reduce<T>(
loggers: Loggers,
pendingDataControl?: PendingDataControl,
): OperatorApplier {
const { provides } = reduceParams
const keepAfterReduce = reduceParams.keep ?? []
let initialBag: ValueBag = {}
const { provides, keep } = reduceParams
let lastBag: ValueBag = {}

function wrappedReduce(acc: T, valueBag: ValueBag, index: number): T {
const startedAt = new Date()
loggers.onStepStarted([valueBag])
const reduceResult = reduceParams.fn(acc, valueBag, index)
pendingDataControl?.decrement()
loggers.onStepFinished([valueBag], startedAt)
initialBag = valueBag
lastBag = valueBag
return reduceResult
}

return function operatorApplier(observable: Observable<ValueBag>) {
return observable
.pipe(reduceRxJs(wrappedReduce, reduceParams.seed))
.pipe(map((reduceResult: T) => getNewValueBag(pick(initialBag, keepAfterReduce), provides, reduceResult)))
.pipe(map((reduceResult: T) => getNewValueBag(pick(lastBag, keep ?? []), provides, reduceResult)))
}
}
4 changes: 1 addition & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ export interface Caminho {
parallel: (multiPipeParams: PipeGenericParams[]) => this
filter: (predicate: FilterPredicate) => this
reduce: <T>(reduceParams: ReduceParams<T>) => this

run(initialBag: ValueBag, pickLastValues: string[]): Promise<unknown>
run(initialBag?: ValueBag): Promise<undefined>
run(initialBag?: ValueBag): Promise<ValueBag>
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
2 changes: 1 addition & 1 deletion test/integration/error.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ describe('Error Handling', () => {
provides: 'doesntMatter',
})

return expect(caminho.run({})).rejects.toMatchObject({ message: 'Reduce error' })
return expect(caminho.run()).rejects.toMatchObject({ message: 'Reduce error' })
})
})
38 changes: 26 additions & 12 deletions test/integration/reduce.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ describe('Reduce', () => {
const generatorMock = getMockedJobGenerator(2)
const saveJob = jest.fn()
const reduceFn = jest.fn().mockImplementation((acc) => (acc + 1))
const saveFn = jest.fn()

const result = await fromGenerator({ fn: generatorMock, provides: 'job' })
await fromGenerator({ fn: generatorMock, provides: 'job' })
.pipe({ fn: saveJob })
.reduce({ fn: reduceFn, provides: 'count', seed: 100 })
.run({}, ['count'])
.pipe({ fn: saveFn })
.run()

expect(result).toEqual({ count: 102 })
expect(saveFn).toHaveBeenCalledWith({ count: 102 })
})

test('Should call aggregator function with correct parameters', async () => {
Expand All @@ -32,17 +34,30 @@ describe('Reduce', () => {
expect(reduceFn).toHaveBeenCalledWith(101, { initial: true, job: { job_id: '2' } }, 1)
})

test('Should keep last values processed in reduce to the run output', async () => {
test('Should keep only the reduce value by default (keep not defined)', async () => {
const generatorMock = getMockedJobGenerator(4)
const saveJob = jest.fn()
const reduceFn = jest.fn().mockImplementation((acc, bag) => (acc + Number(bag.job.job_id)))

const result = await fromGenerator({ fn: generatorMock, provides: 'job' })
.pipe({ fn: saveJob })
.reduce({ fn: reduceFn, provides: 'count', seed: 0 })
.run({ initial: true })

expect(result).toEqual({ count: 10 })
})

test('Should keep the reduce value and the properties passed to "keep"', async () => {
const generatorMock = getMockedJobGenerator(4)
const saveJob = jest.fn()
const reduceFn = jest.fn().mockImplementation((acc, bag) => (acc + Number(bag.job.job_id)))

const result = await fromGenerator({ fn: generatorMock, provides: 'job' })
.pipe({ fn: saveJob })
.reduce({ fn: reduceFn, provides: 'count', seed: 0, keep: ['initial', 'job'] })
.run({ initial: true }, ['initial', 'job', 'count'])
.reduce({ fn: reduceFn, provides: 'count', seed: 0, keep: ['initial'] })
.run({ initial: true })

expect(result).toEqual({ initial: true, job: { job_id: '4' }, count: 10 })
expect(result).toEqual({ count: 10, initial: true })
})

test('Allows flow to continue after the reduce execution', async () => {
Expand All @@ -51,12 +66,12 @@ describe('Reduce', () => {
const reduceFn = jest.fn().mockImplementation((acc, bag) => (acc + Number(bag.job.job_id)))

await fromGenerator({ fn: generatorMock, provides: 'job' })
.reduce({ fn: reduceFn, provides: 'count', seed: 0, keep: ['initial', 'job'] })
.reduce({ fn: reduceFn, provides: 'count', seed: 0 })
.pipe({ fn: saveCount })
.run({ initial: true }, ['initial', 'job', 'count'])
.run({ initial: true })

expect(saveCount).toBeCalledTimes(1)
expect(saveCount).toBeCalledWith({ initial: true, job: { job_id: '4' }, count: 10 })
expect(saveCount).toBeCalledWith({ count: 10 })
})

test('Reduce works fine when combined with backpressure', async () => {
Expand All @@ -79,10 +94,9 @@ describe('Reduce', () => {
fn: reduceFn,
provides: 'count',
seed: 0,
keep: ['initial', 'job'],
})
.pipe({ fn: saveCount, name: 'saveCount' })
.run({ initial: true }, ['initial', 'job', 'count'])
.run()

expect(onStep.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator' })],
Expand Down
16 changes: 8 additions & 8 deletions test/integration/subCaminho.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ describe('Sub-Caminho', () => {

await fromGenerator(companySteps.generator)
.pipe(companySteps.fetchStatus)
.pipe({ fn: (bag: ValueBag) => employeeCaminho.run(bag, ['count']), provides: 'employees' })
.pipe({ fn: employeeCaminho.run, provides: 'employees' })
.pipe(companySteps.saver)
.run()

Expand Down Expand Up @@ -59,7 +59,7 @@ describe('Sub-Caminho', () => {

await fromGenerator({ ...companySteps.generator }, { maxItemsFlowing: 2 })
.pipe(companySteps.fetchStatus)
.pipe({ fn: (bag: ValueBag) => employeeCaminho.run(bag, ['count']), provides: 'employees' })
.pipe({ fn: employeeCaminho.run, provides: 'employees' })
.pipe({ ...companySteps.saver, batch: { maxSize: 2, timeoutMs: 10 } })
.pipe(finalStepCompany)
.run()
Expand All @@ -85,8 +85,8 @@ describe('Sub-Caminho', () => {

await fromGenerator(companySteps.generator)
.pipe(companySteps.fetchStatus)
.pipe({ fn: (bag: ValueBag) => employeeCaminho.run(bag, ['count']), provides: 'employees' })
.pipe({ fn: (bag: ValueBag) => internCaminho.run(bag, ['count']), provides: 'interns' })
.pipe({ fn: employeeCaminho.run, provides: 'employees' })
.pipe({ fn: internCaminho.run, provides: 'interns' })
.pipe(companySteps.saver)
.run()

Expand All @@ -106,13 +106,13 @@ describe('Sub-Caminho', () => {

const employeeCaminho = fromGenerator(employeeSteps.generator)
.pipe(employeeSteps.mapper)
.pipe({ fn: (bag: ValueBag) => documentsCaminho.run(bag, ['count']), provides: 'savedDocuments' })
.pipe({ fn: documentsCaminho.run, provides: 'savedDocuments' })
.pipe(employeeSteps.saver)
.reduce(employeeSteps.accumulator)

await fromGenerator(companySteps.generator)
.pipe(companySteps.fetchStatus)
.pipe({ fn: (bag: ValueBag) => employeeCaminho.run(bag, ['count']), provides: 'savedEmployees' })
.pipe({ fn: employeeCaminho.run, provides: 'savedEmployees' })
.pipe(companySteps.saver)
.run()

Expand Down Expand Up @@ -156,7 +156,7 @@ function getEmployeeSteps() {
const generator = { fn: employeeGeneratorFn, provides: 'employeeName' }
const mapper = { fn: mapEmployeeFn, provides: 'mappedEmployee' }
const saver = { fn: saveEmployeeFn }
const accumulator: ReduceParams<number> = { fn: (acc: number) => acc + 1, seed: 0, provides: 'count' }
const accumulator: ReduceParams<number> = { fn: (acc: number) => acc + 1, seed: 0, provides: 'count', keep: [] }

return {
generator,
Expand All @@ -172,7 +172,7 @@ function getDocumentSteps() {

const generator = { fn: generatorFn, provides: 'documentId' }
const saver = { fn: saverFn }
const accumulator: ReduceParams<number> = { fn: (acc: number) => acc + 1, seed: 0, provides: 'count' }
const accumulator: ReduceParams<number> = { fn: (acc: number) => acc + 1, seed: 0, provides: 'count', keep: [] }

return {
generator,
Expand Down
Loading