Skip to content

Commit

Permalink
Remove the "keepLastValues" parameter on run and change "keep" behavi…
Browse files Browse the repository at this point in the history
…our on reduce
  • Loading branch information
pedrokehl committed Mar 12, 2024
1 parent c459983 commit e93ff5b
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 45 deletions.
9 changes: 4 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ deno add @pedrokehl/caminho
*After the steps definition, execute your Caminho flow by calling `.run()`.*

`run`: Returns a Promise which is fulfilled when the Generator has finished providing values and all the items have been processed by all the defined steps in the Caminho flow.
The function takes two parameters:
- `initialValueBag: ValueBag`: An initial valueBag, which is passed through all steps.
- `pickLastValues: string[]`: List of properties you want to be returned by run execution, only last values are executed, useful mainly for data that got aggregated with reduce.
The function takes an initial valueBag as parameter, which is passed to the child steps, and the returned value from the promise is an object that contains the context of the last execution of the flow.

Simple flow:

Expand Down Expand Up @@ -147,11 +145,12 @@ await fromGenerator({ fn: generateCarIds, provides: 'carId' })

#### Reduce
Caminho features a reduce implementation in its flows, it allows to reduce through **all** records of the flow and produce an aggregated property.
Only the last values from previous steps will be kept in memory for the next steps after a reduce.
To use it, call `reduce()` with the following properties:
- `fn: (acc: A, value: ValueBag, index: number) => A`, Similar to a callback provided to [Array.reduce](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/reduce).
- `seed: A`: Defines the initial `acc` value received on your aggregator function.
- `provides: string`: The property name to be appended to the valueBag with the value returned from the reducer.
- `keep: string[]`: List of the properties that you want to keep the last known value in the valueBag for following steps, useful for child flows.
- `keep: string[]`: Optional list of the properties that you want to keep the last known value in the valueBag for following steps, by default it keep all of them.

```typescript
function sumPrice(acc: number, item: ValueBag) {
Expand All @@ -162,7 +161,7 @@ const result = await fromGenerator({ fn: generateCars, provides: 'carId' })
.pipe({ fn: fetchPrice, provides: 'price' })
.reduce({ fn: sumPrice, seed: 0, provides: 'sum', keep: ['manufacturer'] })
.pipe( { fn: saveTotalForManufacturer })
.run({ manufacturer: 'Mazda' }, ['sum', 'manufacturer'])
.run({ manufacturer: 'Mazda' })

console.log('result', result)
// result { "sum": 1_532_600, "manufacturer": "Mazda" }
Expand Down
4 changes: 2 additions & 2 deletions benchmark/parallel.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async function runParallelBenchmark(parentItems: number, childItemsPerParent: nu
console.time('initialize caminho')
const benchmarkCaminho = fromGenerator(steps.parentGenerator, { maxItemsFlowing: 1_000 })
.parallel([steps.pipe1, steps.pipe2])
.pipe({ fn: childCaminho.run.bind(null, {}, ['count']), provides: 'child' })
.pipe({ fn: childCaminho.run, provides: 'child' })
.reduce({
fn: (acc: number, bag: ValueBag): number => acc + bag.child.count,
seed: 0,
Expand All @@ -28,7 +28,7 @@ async function runParallelBenchmark(parentItems: number, childItemsPerParent: nu
console.timeEnd('initialize caminho')

console.time('run caminho')
const { count: childProcessed } = await benchmarkCaminho.run({}, ['count']) as { count: number }
const { count: childProcessed } = await benchmarkCaminho.run()
console.timeEnd('run caminho')

if (childProcessed !== expectedTotalChild) {
Expand Down
4 changes: 2 additions & 2 deletions benchmark/subFlow.benchmark.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async function runSubflowBenchmark(parentItems: number, childItemsPerParent: num

const parentCaminho = fromGenerator(steps.parentGenerator, { maxItemsFlowing: 1_000 })
.pipe({ fn: steps.pipeFn })
.pipe({ fn: (bag: ValueBag) => childCaminho.run(bag, ['count']), provides: 'child' })
.pipe({ fn: childCaminho.run, provides: 'child' })
.reduce({
fn: (acc: number, bag: ValueBag) => acc + bag.child.count,
seed: 0,
Expand All @@ -28,7 +28,7 @@ async function runSubflowBenchmark(parentItems: number, childItemsPerParent: num
console.timeEnd('initialize caminho')

console.time('run caminho')
const { count: childProcessed } = await parentCaminho.run({}, ['count']) as { count: number }
const { count: childProcessed } = await parentCaminho.run()
console.timeEnd('run caminho')

if (childProcessed !== expectedTotalChild) {
Expand Down
8 changes: 2 additions & 6 deletions src/Caminho.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import { PendingDataControl, PendingDataControlInMemory } from './utils/PendingD

import { getOnStepFinished } from './utils/onStepFinished'
import { getOnStepStarted } from './utils/onStepStarted'
import { pick } from './utils/pick'

export class Caminho implements CaminhoInterface {
private generator: (initialBag: ValueBag) => AsyncGenerator<ValueBag>
Expand Down Expand Up @@ -59,12 +58,9 @@ export class Caminho implements CaminhoInterface {
return this
}

public run(initialBag: ValueBag, pickLastValues: string[]): Promise<unknown>
public run(initialBag?: ValueBag): Promise<undefined>
public async run(initialBag?: ValueBag, pickLastValues?: string[]): Promise<unknown | undefined> {
public async run(initialBag?: ValueBag): Promise<ValueBag> {
const observable$ = this.buildObservable(initialBag)
const lastValue = await lastValueFrom(observable$, { defaultValue: undefined })
return pickLastValues && lastValue ? pick(lastValue, pickLastValues) : undefined
return lastValueFrom(observable$, { defaultValue: initialBag })
}

private getGenerator(generatorParams: FromGeneratorParams): (initialBag: ValueBag) => AsyncGenerator<ValueBag> {
Expand Down
10 changes: 5 additions & 5 deletions src/operators/reduce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,23 @@ export function reduce<T>(
loggers: Loggers,
pendingDataControl?: PendingDataControl,
): OperatorApplier {
const { provides } = reduceParams
const keepAfterReduce = reduceParams.keep ?? []
let initialBag: ValueBag = {}
let lastBag: ValueBag = {}

function wrappedReduce(acc: T, valueBag: ValueBag, index: number): T {
const startedAt = new Date()
loggers.onStepStarted([valueBag])
const reduceResult = reduceParams.fn(acc, valueBag, index)
pendingDataControl?.decrement()
loggers.onStepFinished([valueBag], startedAt)
initialBag = valueBag
lastBag = valueBag
return reduceResult
}

return function operatorApplier(observable: Observable<ValueBag>) {
return observable
.pipe(reduceRxJs(wrappedReduce, reduceParams.seed))
.pipe(map((reduceResult: T) => getNewValueBag(pick(initialBag, keepAfterReduce), provides, reduceResult)))
.pipe(map((reduceResult: T) => (reduceParams.keep
? getNewValueBag(pick(lastBag, reduceParams.keep), reduceParams.provides, reduceResult)
: getNewValueBag(lastBag, reduceParams.provides, reduceResult))))
}
}
4 changes: 1 addition & 3 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ export interface Caminho {
parallel: (multiPipeParams: PipeGenericParams[]) => this
filter: (predicate: FilterPredicate) => this
reduce: <T>(reduceParams: ReduceParams<T>) => this

run(initialBag: ValueBag, pickLastValues: string[]): Promise<unknown>
run(initialBag?: ValueBag): Promise<undefined>
run(initialBag?: ValueBag): Promise<ValueBag>
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
2 changes: 1 addition & 1 deletion test/integration/error.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ describe('Error Handling', () => {
provides: 'doesntMatter',
})

return expect(caminho.run({})).rejects.toMatchObject({ message: 'Reduce error' })
return expect(caminho.run()).rejects.toMatchObject({ message: 'Reduce error' })
})
})
27 changes: 14 additions & 13 deletions test/integration/reduce.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ describe('Reduce', () => {
const generatorMock = getMockedJobGenerator(2)
const saveJob = jest.fn()
const reduceFn = jest.fn().mockImplementation((acc) => (acc + 1))
const saveFn = jest.fn()

const result = await fromGenerator({ fn: generatorMock, provides: 'job' })
await fromGenerator({ fn: generatorMock, provides: 'job' })
.pipe({ fn: saveJob })
.reduce({ fn: reduceFn, provides: 'count', seed: 100 })
.run({}, ['count'])
.reduce({ fn: reduceFn, provides: 'count', seed: 100, keep: [] })
.pipe({ fn: saveFn })
.run()

expect(result).toEqual({ count: 102 })
expect(saveFn).toHaveBeenCalledWith({ count: 102 })
})

test('Should call aggregator function with correct parameters', async () => {
Expand All @@ -32,31 +34,31 @@ describe('Reduce', () => {
expect(reduceFn).toHaveBeenCalledWith(101, { initial: true, job: { job_id: '2' } }, 1)
})

test('Should keep last values processed in reduce to the run output', async () => {
test('Should keep all last values processed by default (keep not defined)', async () => {
const generatorMock = getMockedJobGenerator(4)
const saveJob = jest.fn()
const reduceFn = jest.fn().mockImplementation((acc, bag) => (acc + Number(bag.job.job_id)))

const result = await fromGenerator({ fn: generatorMock, provides: 'job' })
.pipe({ fn: saveJob })
.reduce({ fn: reduceFn, provides: 'count', seed: 0, keep: ['initial', 'job'] })
.run({ initial: true }, ['initial', 'job', 'count'])
.reduce({ fn: reduceFn, provides: 'count', seed: 0 })
.run({ initial: true })

expect(result).toEqual({ initial: true, job: { job_id: '4' }, count: 10 })
})

test('Allows flow to continue after the reduce execution', async () => {
test('Allows flow to continue after the reduce execution, and pass only what is provided in keep', async () => {
const generatorMock = getMockedJobGenerator(4)
const saveCount = jest.fn()
const reduceFn = jest.fn().mockImplementation((acc, bag) => (acc + Number(bag.job.job_id)))

await fromGenerator({ fn: generatorMock, provides: 'job' })
.reduce({ fn: reduceFn, provides: 'count', seed: 0, keep: ['initial', 'job'] })
.reduce({ fn: reduceFn, provides: 'count', seed: 0, keep: ['count'] })
.pipe({ fn: saveCount })
.run({ initial: true }, ['initial', 'job', 'count'])
.run({ initial: true })

expect(saveCount).toBeCalledTimes(1)
expect(saveCount).toBeCalledWith({ initial: true, job: { job_id: '4' }, count: 10 })
expect(saveCount).toBeCalledWith({ count: 10 })
})

test('Reduce works fine when combined with backpressure', async () => {
Expand All @@ -79,10 +81,9 @@ describe('Reduce', () => {
fn: reduceFn,
provides: 'count',
seed: 0,
keep: ['initial', 'job'],
})
.pipe({ fn: saveCount, name: 'saveCount' })
.run({ initial: true }, ['initial', 'job', 'count'])
.run()

expect(onStep.mock.calls).toEqual([
[getOnStepFinishedParamsFixture({ name: 'generator' })],
Expand Down
16 changes: 8 additions & 8 deletions test/integration/subCaminho.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ describe('Sub-Caminho', () => {

await fromGenerator(companySteps.generator)
.pipe(companySteps.fetchStatus)
.pipe({ fn: (bag: ValueBag) => employeeCaminho.run(bag, ['count']), provides: 'employees' })
.pipe({ fn: employeeCaminho.run, provides: 'employees' })
.pipe(companySteps.saver)
.run()

Expand Down Expand Up @@ -59,7 +59,7 @@ describe('Sub-Caminho', () => {

await fromGenerator({ ...companySteps.generator }, { maxItemsFlowing: 2 })
.pipe(companySteps.fetchStatus)
.pipe({ fn: (bag: ValueBag) => employeeCaminho.run(bag, ['count']), provides: 'employees' })
.pipe({ fn: employeeCaminho.run, provides: 'employees' })
.pipe({ ...companySteps.saver, batch: { maxSize: 2, timeoutMs: 10 } })
.pipe(finalStepCompany)
.run()
Expand All @@ -85,8 +85,8 @@ describe('Sub-Caminho', () => {

await fromGenerator(companySteps.generator)
.pipe(companySteps.fetchStatus)
.pipe({ fn: (bag: ValueBag) => employeeCaminho.run(bag, ['count']), provides: 'employees' })
.pipe({ fn: (bag: ValueBag) => internCaminho.run(bag, ['count']), provides: 'interns' })
.pipe({ fn: employeeCaminho.run, provides: 'employees' })
.pipe({ fn: internCaminho.run, provides: 'interns' })
.pipe(companySteps.saver)
.run()

Expand All @@ -106,13 +106,13 @@ describe('Sub-Caminho', () => {

const employeeCaminho = fromGenerator(employeeSteps.generator)
.pipe(employeeSteps.mapper)
.pipe({ fn: (bag: ValueBag) => documentsCaminho.run(bag, ['count']), provides: 'savedDocuments' })
.pipe({ fn: documentsCaminho.run, provides: 'savedDocuments' })
.pipe(employeeSteps.saver)
.reduce(employeeSteps.accumulator)

await fromGenerator(companySteps.generator)
.pipe(companySteps.fetchStatus)
.pipe({ fn: (bag: ValueBag) => employeeCaminho.run(bag, ['count']), provides: 'savedEmployees' })
.pipe({ fn: employeeCaminho.run, provides: 'savedEmployees' })
.pipe(companySteps.saver)
.run()

Expand Down Expand Up @@ -156,7 +156,7 @@ function getEmployeeSteps() {
const generator = { fn: employeeGeneratorFn, provides: 'employeeName' }
const mapper = { fn: mapEmployeeFn, provides: 'mappedEmployee' }
const saver = { fn: saveEmployeeFn }
const accumulator: ReduceParams<number> = { fn: (acc: number) => acc + 1, seed: 0, provides: 'count' }
const accumulator: ReduceParams<number> = { fn: (acc: number) => acc + 1, seed: 0, provides: 'count', keep: [] }

return {
generator,
Expand All @@ -172,7 +172,7 @@ function getDocumentSteps() {

const generator = { fn: generatorFn, provides: 'documentId' }
const saver = { fn: saverFn }
const accumulator: ReduceParams<number> = { fn: (acc: number) => acc + 1, seed: 0, provides: 'count' }
const accumulator: ReduceParams<number> = { fn: (acc: number) => acc + 1, seed: 0, provides: 'count', keep: [] }

return {
generator,
Expand Down

0 comments on commit e93ff5b

Please sign in to comment.