Skip to content
This repository has been archived by the owner on Aug 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #27 from AlexandrHoroshih/feat/22-compat
Browse files Browse the repository at this point in the history
fix: make ReEffect compatible with effector 22
  • Loading branch information
yumauri committed Sep 27, 2021
2 parents 71f30c4 + b630461 commit 94874ed
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 111 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:

strategy:
matrix:
node: ['10', '12', '14']
node: ['12', '14', '16']

name: Node ${{ matrix.node }}

Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"@pika/plugin-ts-standard-pkg": "^0.9.2",
"@size-limit/preset-small-lib": "^4.9.1",
"@types/jest": "^26.0.19",
"effector": "^21.8.11",
"effector": "^22.1.1",
"jest": "^26.6.3",
"pika-plugin-package.json": "^1.0.2",
"prettier": "^2.2.1",
Expand All @@ -85,9 +85,9 @@
"yaspeller": "^7.0.0"
},
"peerDependencies": {
"effector": "^21.2.0"
"effector": "^22.0.0"
},
"engines": {
"node": ">=10.17.0"
"node": ">=12.13.0"
}
}
40 changes: 21 additions & 19 deletions src/createReEffect.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import { createEffect as effectorCreateEffect, createEvent } from 'effector'
import {
createEffect as effectorCreateEffect,
createEvent,
createStore,
} from 'effector'
import { CreateReEffect, CreateReEffectConfig, ReEffect } from './types'
import { CancellablePromise } from './promise'
import { patchInstance } from './instance'
import { patchRunner } from './runner'
import { Strategy, TAKE_EVERY } from './strategy'
import { TAKE_EVERY } from './strategy'

/**
* High-order function over createEffect
Expand All @@ -18,6 +21,17 @@ export const createReEffectFactory = (
const instance = (createEffect as any)(nameOrConfig, maybeConfig)
const cancelled = (createEvent as any)({ named: 'cancelled' })
const cancel = (createEvent as any)({ named: 'cancel' })
// Separate instance of inFlight is created,
// because inFlight will be force-synchronized with internal runningCount
// Doing this with actual inFlight instance leads to crazy bugs like negative inFlight count
// Not doing this at all leads to changes in observable behaviour of ReEffect
const inFlightInternal = (createStore as any)(0, {
named: 'reeffectInFlight',
}).on(instance, s => s + 1)
const pendingInternal = inFlightInternal.map({
fn: amount => amount > 0,
named: 'reeffectPending',
})

// prettier-ignore
const config =
Expand All @@ -27,32 +41,20 @@ export const createReEffectFactory = (
? nameOrConfig
: {}

const running: CancellablePromise<Done>[] = []

const scope = {
strategy: config.strategy || TAKE_EVERY,
feedback: config.feedback || false,
limit: config.limit || Infinity,
timeout: config.timeout,
cancelled,
cancel,
running,
inFlight: instance.inFlight,

push: (promise: CancellablePromise<Done>) => running.push(promise),
unpush: (promise?: CancellablePromise<Done>) => {
if (promise) {
// `running` array should always contain `promise`
// no need to check for index === -1
running.splice(running.indexOf(promise), 1)
}
return running.length
},
cancelAll: (strategy?: Strategy) =>
running.map(promise => promise.cancel(strategy)),
inFlight: inFlightInternal,
pending: pendingInternal,
anyway: instance.finally,
}

patchRunner<Payload, Done, Fail>(instance.graphite.scope.runner, scope as any)
patchInstance<Payload, Done, Fail>(instance, scope)

return instance
}
126 changes: 111 additions & 15 deletions src/fork.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createDomain, forward, scopeBind } from 'effector'
import { fork, serialize, allSettled } from 'effector/fork'
import { createDomain, forward, scopeBind, guard } from 'effector'
import { fork, serialize, allSettled } from 'effector'
import { createReEffectFactory } from './createReEffect'
import { TAKE_FIRST, TAKE_LAST, QUEUE, RACE } from './strategy'

Expand Down Expand Up @@ -39,6 +39,67 @@ test('createReEffect resolves in fork by default', async () => {
test('createReEffect do not affect other forks', async () => {
const createReEffect = createReEffectFactory()

const app = createDomain()
const start = app.createEvent<number>()
const $store = app.createStore(0, { sid: '$store' })
const reeffect = createReEffect({
async handler(param: number, onCancel) {
await new Promise<void>((rs, rj) => {
let id = setTimeout(() => {
rs()
}, 5 * param)

onCancel(() => {
clearTimeout(id)
rj()
})
})

return 5 * param
},
})

$store.on(reeffect.done, (state, { result }) => state + result)

forward({
from: start,
to: reeffect,
})

guard({
source: start,
filter: p => p === 1000,
}).watch(p => {
const scopedCancel = scopeBind(reeffect.cancel)

setTimeout(() => scopedCancel(), p)
})

const scopeAlice = fork(app)
const scopeBob = fork(app)

await allSettled(start, {
scope: scopeAlice,
params: 10,
})

await allSettled(start, {
scope: scopeBob,
params: 1000,
})

expect(serialize(scopeAlice)).toMatchInlineSnapshot(`
Object {
"$store": 50,
}
`)

expect(serialize(scopeBob)).toMatchInlineSnapshot(`Object {}`)
})

test('createReEffect cancelled reeffect do not affect other forks', async () => {
const createReEffect = createReEffectFactory()

const app = createDomain()
const start = app.createEvent<number>()
const $store = app.createStore(0, { sid: '$store' })
Expand Down Expand Up @@ -251,11 +312,8 @@ test('createReEffect in scope: cancelled reeffect does not hanging up `allSettle
params: undefined,
})

expect(serialize(scope)).toMatchInlineSnapshot(`
Object {
"$store": 0,
}
`)
expect(scope.getState($store)).toEqual(0)
expect(serialize(scope)).toMatchInlineSnapshot(`Object {}`) // store is not changed, so it must be not serialized
})

test('createReEffect in scope: failed reeffect does not hanging up `allSettled` and resolves in scope correctly', async () => {
Expand Down Expand Up @@ -339,15 +397,49 @@ test('createReEffect in scope: multiple calls aren`t hanging up `allSettled`', a
params: undefined,
})

expect(cancelled).toBeCalledTimes(2)

expect(serialize(scope)).toMatchInlineSnapshot(`
Object {
"$store": 5,
}
`)
})

test('createReEffect in scope: handler for scope works', async () => {
const createReEffect = createReEffectFactory()

const app = createDomain()
const start = app.createEvent()
const $store = app.createStore(0, { name: '$store', sid: '$store' })
const reeffect = createReEffect({
async handler() {
return 5
},
sid: 'reeffect',
})

$store.on(reeffect.done, (state, { result }) => state + result)

forward({
from: start,
to: reeffect,
})

const scope = fork({
handlers: [[reeffect, async () => 7]],
})

await allSettled(start, {
scope,
params: undefined,
})

expect(serialize(scope)).toMatchInlineSnapshot(`
Object {
"$store": 7,
}
`)
})

test('createReEffect in scope: TAKE_EVERY', async () => {
const cancelled = jest.fn()

Expand Down Expand Up @@ -380,8 +472,6 @@ test('createReEffect in scope: TAKE_EVERY', async () => {
params: undefined,
})

expect(cancelled).toBeCalledTimes(2)

expect(serialize(scope)).toMatchInlineSnapshot(`
Object {
"$store": 5,
Expand Down Expand Up @@ -482,7 +572,11 @@ test('createReEffect in scope: QUEUE', async () => {
const $store = app.createStore(0, { name: '$store', sid: '$store' })
const reeffect = createReEffect<number, number>({
async handler(p) {
return new Promise<number>(resolve => setTimeout(() => resolve(p), 30))
return new Promise<number>(resolve =>
setTimeout(() => {
resolve(p)
}, 30)
)
},
strategy: QUEUE,
})
Expand All @@ -495,9 +589,13 @@ test('createReEffect in scope: QUEUE', async () => {
bindReeffect(1)
bindReeffect(2)
bindReeffect(3)
bindReeffect(4)
bindReeffect(5)
})

$store.on(reeffect.done, (state, { result }) => state + result)
$store.on(reeffect.done, (state, { result }) => {
return state + result
})

const scope = fork(app)

Expand All @@ -506,8 +604,6 @@ test('createReEffect in scope: QUEUE', async () => {
params: undefined,
})

expect(cancelled).toBeCalledTimes(2)

expect(serialize(scope)).toMatchInlineSnapshot(`
Object {
"$store": 5,
Expand Down
11 changes: 7 additions & 4 deletions src/instance.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import { Event, launch, Step, step } from 'effector'
import { Event, launch, Node as Step, step, Store } from 'effector'
import { CancelledPayload, MutableReEffect, ReEffectConfig } from './types'
import { defer } from './promise'
import { assign, own } from './tools'
import { assign, own, setMeta } from './tools'

interface InstanceNewEvents<Payload> {
readonly cancelled: Event<CancelledPayload<Payload>> & { graphite: Step }
readonly cancel: Event<void> & { graphite: Step }
readonly feedback: boolean
readonly inFlight: Store<number> & { graphite: Step }
readonly pending: Store<boolean> & { graphite: Step }
}

/**
* Patch effect, add new events and change direct call
*/
export const patchInstance = <Payload, Done, Fail>(
instance: MutableReEffect<Payload, Done, Fail>,
{ cancelled, cancel, feedback }: InstanceNewEvents<Payload>
{ cancelled, cancel, feedback, inFlight, pending }: InstanceNewEvents<Payload>
) => {
assign(instance, { cancelled, cancel })
assign(instance, { cancelled, cancel, inFlight, pending })
own(instance, [cancelled, cancel])
setMeta(cancelled, 'needFxCounter', 'dec')

// adjust create function, to be able to set strategy, alongside with params
instance.create = (paramsOrConfig, [strategyOrConfig]) => {
Expand Down
37 changes: 35 additions & 2 deletions src/promise.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { createEvent, createStore, Scope } from 'effector'
import { CancelledError, TimeoutError } from './error'
import { Strategy } from './strategy'
import { assign } from './tools'
import { assign, read } from './tools'

export type CancellablePromise<T> = Promise<T> & {
cancel: (strategy?: Strategy) => void
cancel: (strategy?: Strategy | void) => void
}

/**
Expand Down Expand Up @@ -56,3 +57,35 @@ export const defer = <Done>(): {
deferred.req.catch(() => {})
return deferred
}

/**
* Creates running promises storage
* Store is automatically forked within the scope, which allows to use reeffect easily with fork api
*/
export const createRunning = <Done>() => {
// needed to catch the scope when cancelling manually
const cancelAllEv = createEvent<Strategy | void>({
sid: 'internal/cancelAll',
})
const $running = createStore<CancellablePromise<Done>[]>([], {
sid: 'internal/$running',
serialize: 'ignore',
})

$running.watch(cancelAllEv, (runs, strategy) =>
runs.forEach(p => p.cancel(strategy))
)

const push = (promise: CancellablePromise<Done>, scope?: Scope) => {
read(scope)($running).push(promise)
}
const unpush = (promise: CancellablePromise<Done>, scope?: Scope) => {
if (!promise) return
read(scope)($running).splice(read(scope)($running).indexOf(promise), 1)
}
const cancelAll = (strategy: Strategy | undefined, scope?: Scope) => {
read(scope)($running).forEach(p => p.cancel(strategy))
}

return { $running, push, unpush, cancelAll, cancelAllEv }
}

0 comments on commit 94874ed

Please sign in to comment.