Skip to content

Commit 569013b

Browse files
committed
feat: implement stop and restart functionality for EventVerse, OrapVerse, and TaskVerse; enhance event handling in app.ts
1 parent b56e5aa commit 569013b

File tree

15 files changed

+394
-14
lines changed

15 files changed

+394
-14
lines changed

packages/orap/examples/declarativeDemo/app.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export function startDemo(options: ListenOptions, storeConfig?: any) {
2424

2525
const toKey: ToKeyFn = (from: string, _to: string, _amount: number) => `${from}_${randomStr(4)}`
2626

27-
orap.event(eventSignalParam.address, eventSignalParam.abi, eventSignalParam.eventName)
27+
const event = orap.event(eventSignalParam.address, eventSignalParam.abi, eventSignalParam.eventName)
2828
.crosscheck({
2929
store,
3030
storeKeyPrefix: 'ora-stack:orap:demo:cc:',
@@ -37,8 +37,8 @@ export function startDemo(options: ListenOptions, storeConfig?: any) {
3737
// event hook, not necessary
3838
.handle(newEventSignalHook)
3939

40-
// add a task
41-
.task()
40+
// add a task
41+
event.task()
4242
.cache(sm)
4343
.key(toKey)
4444
.prefix('ora-stack:orap:demo:TransferTask:', 'ora-stack:orap:demo:Done-TransferTask:')
@@ -53,6 +53,8 @@ export function startDemo(options: ListenOptions, storeConfig?: any) {
5353
.ttl({ taskTtl: 20000, doneTtl: 20000 })
5454
.handle(handleTask_2)
5555

56+
event.stop()
57+
5658
// start signal listener
5759
orap.listen(
5860
options,

packages/orap/src/beat/event.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,8 @@ export class EventBeat extends EventSignal {
2020
drop() {
2121
this.listen(this.subscribeProvider, this.crosscheckProvider)
2222
}
23+
24+
stop() {
25+
super.stop()
26+
}
2327
}

packages/orap/src/flow/event.test.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { ethers } from 'ethers'
2-
import { beforeEach, describe, expect, it } from 'vitest'
2+
import { beforeEach, describe, expect, it, vi } from 'vitest'
3+
import { EventVerse } from '../verse/event'
34
import { OrapFlow } from './orap'
45
import { EventFlow } from './event'
56

@@ -48,4 +49,11 @@ describe('EventFlow', () => {
4849
const parentFlow = eventFlow.another()
4950
expect(parentFlow).toBe(orapFlow)
5051
})
52+
53+
it('should stop the EventVerse', () => {
54+
const stopFn = vi.fn()
55+
vi.spyOn(EventVerse.prototype, 'stop').mockImplementation(stopFn)
56+
eventFlow.stop()
57+
expect(stopFn).toHaveBeenCalled()
58+
})
5159
})

packages/orap/src/flow/event.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ export class EventFlow implements Flow {
1818
private _subscribeProvider?: Providers
1919
private _crosscheckProvider?: Providers
2020

21+
private _verse: EventVerse = new EventVerse(this)
22+
23+
get verse() {
24+
return this._verse
25+
}
26+
2127
constructor(
2228
private parentFlow?: OrapFlow,
2329
public params?: EventSignalRegisterParams,
@@ -91,4 +97,12 @@ export class EventFlow implements Flow {
9197
another(): OrapFlow {
9298
return this.parentFlow!
9399
}
100+
101+
stop() {
102+
this._verse.stop()
103+
}
104+
105+
restart() {
106+
this._verse.restart()
107+
}
94108
}

packages/orap/src/flow/orap.test.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ethers } from 'ethers'
22
import { beforeEach, describe, expect, it, vi } from 'vitest'
33
import { SEPOLIA_HTTP, SEPOLIA_WSS } from '../../tests/config'
4+
import { OrapVerse } from '../verse/orap'
45
import { OrapFlow } from './orap'
56

67
describe('OrapFlow', () => {
@@ -33,4 +34,11 @@ describe('OrapFlow', () => {
3334
expect(eventFlow).toBeDefined()
3435
expect(orapFlow.eventFlows).toContain(eventFlow)
3536
})
37+
38+
it('should stop the OrapVerse', () => {
39+
const stopFn = vi.fn()
40+
vi.spyOn(OrapVerse.prototype, 'stop').mockImplementation(stopFn)
41+
orapFlow.stop()
42+
expect(stopFn).toHaveBeenCalled()
43+
})
3644
})

packages/orap/src/flow/orap.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ export class OrapFlow implements Flow {
2727
return this.subflows.event
2828
}
2929

30+
private _verse: OrapVerse = new OrapVerse(this)
31+
32+
get verse() {
33+
return this._verse
34+
}
35+
3036
event(params: EventSignalRegisterParams, handler?: HandleFn): EventFlow
3137
event(address: ContractAddress, abi: Interface | InterfaceAbi | HandleFn, eventName: string, handler?: HandleFn): EventFlow
3238
event(params: EventSignalRegisterParams | ContractAddress, abi?: Interface | InterfaceAbi | HandleFn, eventName?: string, handler?: HandleFn): EventFlow {
@@ -54,15 +60,23 @@ export class OrapFlow implements Flow {
5460

5561
if (onListenFn)
5662
this.onListenFn = onListenFn
63+
const eventVerses = this.subflows.event.map(flow => flow.verse)
64+
this._verse.setEventVerses(eventVerses)
5765

58-
const orapVerse = this.assemble()
59-
orapVerse.play()
66+
this._verse.play()
6067
this.onListenFn()
6168
}
6269

70+
stop() {
71+
this._verse.stop()
72+
}
73+
74+
restart() {
75+
this._verse.restart()
76+
}
77+
6378
assemble(): OrapVerse {
6479
const eventVerses = this.subflows.event.map(flow => flow.assemble())
6580
return new OrapVerse(this).setEventVerses(eventVerses)
66-
// this.routes.event.push(es)
6781
}
6882
}

packages/orap/src/flow/task.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ const defaultHandleFn: HandleFn = () => {
1616
}
1717

1818
const defaultToKeyFn: ToKeyFn = _ => randomStr(8, alphabetHex)
19+
20+
const defaultFailFn: HandleResultFn = async (task: TaskRaplized) => {
21+
await task.remove()
22+
}
23+
1924
export interface TaskFlowTTL { taskTtl: Milliseconds; doneTtl: Milliseconds }
2025

2126
export interface TaskFlowParams {
@@ -41,15 +46,17 @@ export class TaskFlow implements Flow {
4146
toKeyFn: ToKeyFn = defaultToKeyFn
4247
handleFn: HandleFn = defaultHandleFn
4348
successFn: HandleResultFn = defaultSuccessFn
49+
failFn: HandleResultFn = defaultFailFn
4450

4551
private _middlewares: Array<HandleFn> = []
46-
47-
failFn: HandleResultFn = async (task: TaskRaplized) => {
48-
await task.remove()
49-
}
52+
private _verse: TaskVerse = new TaskVerse(this)
5053

5154
ctx?: Context
5255

56+
get verse() {
57+
return this._verse
58+
}
59+
5360
constructor(
5461
private parentFlow: EventFlow,
5562
params?: TaskFlowParams,
@@ -136,4 +143,12 @@ export class TaskFlow implements Flow {
136143
assemble(): TaskVerse {
137144
return new TaskVerse(this)
138145
}
146+
147+
stop() {
148+
this._verse.stop()
149+
}
150+
151+
restart() {
152+
this._verse.restart()
153+
}
139154
}

packages/orap/src/signal/event.test.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,14 @@ describe('EventSignal', () => {
156156
expect(eventSignal.crosschecker?.start).toHaveBeenCalledWith(eventSignal.crosscheckerOptions)
157157
})
158158
})
159+
160+
describe('stop', () => {
161+
it('should stop the event listener and the crosschecker', () => {
162+
eventSignal.stopEventListener = vi.fn()
163+
eventSignal.stopCrossChecker = vi.fn()
164+
eventSignal.stop()
165+
expect(eventSignal.stopEventListener).toHaveBeenCalled()
166+
expect(eventSignal.stopCrossChecker).toHaveBeenCalledWith()
167+
})
168+
})
159169
})

packages/orap/src/signal/event.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,23 @@ export class EventSignal implements Signal {
126126
}
127127
}
128128

129+
stop() {
130+
this.stopEventListener()
131+
this.stopCrossChecker()
132+
}
133+
134+
stopEventListener() {
135+
if (this.provider instanceof RekuProviderManager)
136+
this.provider.destroy()
137+
138+
else
139+
this.contract.removeListener(this.params.eventName, this.subscribeCallback)
140+
}
141+
142+
stopCrossChecker() {
143+
this.crosschecker?.stop()
144+
}
145+
129146
async startCrossChecker(provider?: Providers) {
130147
if (this.crosscheckerParams?.disabled)
131148
return

packages/orap/src/verse/event.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,24 @@ describe('EventVerse', () => {
7373

7474
expect(eventBeatDrop).toHaveBeenCalled()
7575
})
76+
77+
it('should stop event verse', () => {
78+
const eventBeatStop = vi.fn()
79+
vi.spyOn(EventBeat.prototype, 'stop').mockImplementation(eventBeatStop)
80+
81+
// Mock the _play method to avoid actual EventBeat creation
82+
vi.spyOn(eventVerse as any, '_play').mockImplementation(() => {
83+
// Create a mock eventBeat
84+
(eventVerse as any).eventBeat = {
85+
stop: eventBeatStop,
86+
}
87+
})
88+
89+
// Call play first to create the eventBeat
90+
eventVerse.play()
91+
// Then call stop
92+
eventVerse.stop()
93+
94+
expect(eventBeatStop).toHaveBeenCalled()
95+
})
7696
})

0 commit comments

Comments
 (0)