Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions packages/orap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ orap.event(eventSignalParam)
.ttl({ taskTtl: 120000, doneTtl: 60000 })
.handle(handle2)

// set logger before listen
orap.logger(logger)

// start signal listeners
orap.listen(
{
Expand All @@ -130,8 +127,6 @@ Each `new Orap()` starts a `Orap Flow`
- optional: httpProvider, for crosscheck only, since crosscheck is based on getLogs
- `onListenFn`: customized hook when listener started.

**.logger(logger)**
- set which logger to use across this orap

#### Event Flow

Expand Down Expand Up @@ -302,4 +297,4 @@ etc.
### StorageManager
- a wrap class designed for caching tasks in Orap
- `store`: the store entity, currently provides 2 options: use memory or redis, checkout `orap/store`
- `queryDelay`: when doing retry-able operations, e.g. get all keys with the given prefix, this defines the interval between retries.
- `queryDelay`: when doing retry-able operations, e.g. get all keys with the given prefix, this defines the interval between retries.
1 change: 0 additions & 1 deletion packages/orap/examples/customDemo/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { TransferTask } from './taskTransfer'

// new orap
const orap = new Orap()
orap.logger(logger)

let store: any
let sm: any
Expand Down
3 changes: 0 additions & 3 deletions packages/orap/examples/declarativeDemo/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ export function startDemo(options: ListenOptions, storeConfig?: any) {
.ttl({ taskTtl: 20000, doneTtl: 20000 })
.handle(handleTask_2)

// set logger before listen
orap.logger(logger)

// start signal listener
orap.listen(
options,
Expand Down
3 changes: 0 additions & 3 deletions packages/orap/examples/raplizeSample/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ orap.event(eventSignalParam)
.ttl({ taskTtl: 120000, doneTtl: 60000 })
.handle(handle)

// set logger before listen
orap.logger(logger)

// start signal listeners
orap.listen(
{
Expand Down
4 changes: 1 addition & 3 deletions packages/orap/src/beat/event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { Logger } from '@ora-io/utils'
import type { AutoCrossCheckParam, Providers } from '@ora-io/reku'
import type { EventSignalCallback, EventSignalRegisterParams } from '../signal'
import { EventSignal } from '../signal'
Expand All @@ -11,12 +10,11 @@ export class EventBeat extends EventSignal {
constructor(
params: EventSignalRegisterParams,
callback: EventSignalCallback,
logger: Logger,
crosscheckOptions: Omit<AutoCrossCheckParam, 'address' | 'topics' | 'onMissingLog'> | undefined,
private subscribeProvider: Providers,
private crosscheckProvider: Providers | undefined,
) {
super(params, callback, logger, crosscheckOptions)
super(params, callback, crosscheckOptions)
}

drop() {
Expand Down
51 changes: 51 additions & 0 deletions packages/orap/src/flow/event.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { ethers } from 'ethers'
import { beforeEach, describe, expect, it } from 'vitest'
import { OrapFlow } from './orap'
import { EventFlow } from './event'

describe('EventFlow', () => {
let orapFlow: OrapFlow
let eventFlow: EventFlow

beforeEach(() => {
orapFlow = new OrapFlow()
eventFlow = new EventFlow(orapFlow)
})

it('should create a task flow', () => {
const taskFlow = eventFlow.task()
expect(taskFlow).toBeDefined()
expect(eventFlow.taskFlows).toContain(taskFlow)
})

it('should set the subscribe provider', () => {
const provider = new ethers.JsonRpcProvider()
eventFlow.setSubscribeProvider(provider)
expect(eventFlow.subscribeProvider).toBe(provider)
})

it('should set the crosscheck provider', () => {
const provider = new ethers.JsonRpcProvider()
eventFlow.setCrosscheckProvider(provider)
expect(eventFlow.crosscheckProvider).toBe(provider)
})

it('should set the handle function', () => {
const handleFn = () => true
eventFlow.handle(handleFn)
expect(eventFlow.handleFn).toBe(handleFn)
})

it('should assemble the EventVerse', () => {
const taskFlow = eventFlow.task()
const eventVerse = eventFlow.assemble()
expect(eventVerse).toBeDefined()
expect(taskFlow).toBeDefined()
expect(eventFlow.taskFlows).toContain(taskFlow)
})

it('should return the parent OrapFlow', () => {
const parentFlow = eventFlow.another()
expect(parentFlow).toBe(orapFlow)
})
})
12 changes: 5 additions & 7 deletions packages/orap/src/flow/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { Flow, HandleFn } from './interface'
import type { OrapFlow } from './orap'

export class EventFlow implements Flow {
private taskFlows: TaskFlow[] = []
private _taskFlows: TaskFlow[] = []

handleFn: HandleFn
partialCrosscheckOptions?: Omit<AutoCrossCheckParam, 'address' | 'topics' | 'onMissingLog'>
Expand All @@ -24,14 +24,12 @@ export class EventFlow implements Flow {
) {
// Default handleFn
this.handleFn = handleFn ?? (async (..._args: Array<any>) => {
const _contractEventPayload = _args.pop()
this.logger.debug('handle event signal', _contractEventPayload.log.transactionHash)
return true
})
}

get logger() {
return this.parentFlow!._logger
get taskFlows() {
return this._taskFlows
}

crosscheck(options?: Omit<AutoCrossCheckParam, 'address' | 'topics' | 'onMissingLog'>) {
Expand All @@ -46,7 +44,7 @@ export class EventFlow implements Flow {
tf.cache(sm)
if (context)
tf.context(context)
this.taskFlows.push(tf)
this._taskFlows.push(tf)
return tf
}

Expand Down Expand Up @@ -74,7 +72,7 @@ export class EventFlow implements Flow {
}

private _assembleTaskFlows(): TaskVerse[] {
return this.taskFlows.map(flow => flow.assemble())
return this._taskFlows.map(flow => flow.assemble())
}

// TODO: use _assemble? for ux?
Expand Down
36 changes: 36 additions & 0 deletions packages/orap/src/flow/orap.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { ethers } from 'ethers'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { SEPOLIA_HTTP, SEPOLIA_WSS } from '../../tests/config'
import { OrapFlow } from './orap'

describe('OrapFlow', () => {
let wsProvider: any
let httpProvider: any
let orapFlow: OrapFlow

beforeEach(() => {
wsProvider = new ethers.WebSocketProvider(SEPOLIA_WSS)
httpProvider = new ethers.JsonRpcProvider(SEPOLIA_HTTP)
orapFlow = new OrapFlow()
})

it('should create an event flow', () => {
const eventFlow = orapFlow.event()
expect(eventFlow).toBeDefined()
expect(orapFlow.eventFlows).toContain(eventFlow)
})

it('should listen for events', () => {
const onListenFn = vi.fn()
orapFlow.listen({ wsProvider, httpProvider }, onListenFn)
expect(orapFlow.onListenFn).toBe(onListenFn)
})

it('should assemble the OrapVerse', () => {
const eventFlow = orapFlow.event()
const orapVerse = orapFlow.assemble()
expect(orapVerse).toBeDefined()
expect(eventFlow).toBeDefined()
expect(orapFlow.eventFlows).toContain(eventFlow)
})
})
11 changes: 3 additions & 8 deletions packages/orap/src/flow/orap.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import type { Logger } from '@ora-io/utils'
import { logger } from '@ora-io/utils'
import type { Providers } from '@ora-io/reku'
import { OrapVerse } from '../verse/orap'
import type { EventSignalRegisterParams } from '../signal'
Expand All @@ -20,11 +18,9 @@ export class OrapFlow implements Flow {
} = { event: [] }

onListenFn: any = () => { }
_logger: Logger = logger

logger(logger: Logger) {
this._logger = logger
return this
get eventFlows() {
return this.subflows.event
}

event(options?: EventSignalRegisterParams, handler?: any): EventFlow {
Expand Down Expand Up @@ -52,8 +48,7 @@ export class OrapFlow implements Flow {
}

assemble(): OrapVerse {
// const es = new EventSignal(options, fn, this.logger)
const eventVerses = this.subflows.event.map(flow => flow.assemble(), { logger: this.logger })
const eventVerses = this.subflows.event.map(flow => flow.assemble())
return new OrapVerse(this).setEventVerses(eventVerses)
// this.routes.event.push(es)
}
Expand Down
84 changes: 84 additions & 0 deletions packages/orap/src/flow/task.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { memoryStore } from '@ora-io/utils'
import { StoreManager } from '../store'
import { EventFlow, TaskFlow } from '.'

describe('TaskFlow', () => {
let parentFlow: any
let taskFlow: TaskFlow

beforeEach(() => {
parentFlow = new EventFlow()
taskFlow = new TaskFlow(parentFlow)
})

it('should set cache', () => {
const sm = new StoreManager(memoryStore())
const result = taskFlow.cache(sm)
expect(result).toBe(taskFlow)
expect(taskFlow.sm).toBe(sm)
})

it('should set context', () => {
const ctx = { key: 'value' }
const result = taskFlow.context(ctx)
expect(result).toBe(taskFlow)
expect(taskFlow.ctx).toBe(ctx)
})

it('should set prefix', () => {
const taskPrefix = 'Task:'
const donePrefix = 'Done-Task:'
const result = taskFlow.prefix(taskPrefix, donePrefix)
expect(result).toBe(taskFlow)
expect(taskFlow.taskPrefix).toBe(taskPrefix)
expect(taskFlow.donePrefix).toBe(donePrefix)
})

it('should set TTL', () => {
const TTLs = { taskTtl: 1000, doneTtl: 2000 }
const result = taskFlow.ttl(TTLs)
expect(result).toBe(taskFlow)
expect(taskFlow.taskTtl).toBe(TTLs.taskTtl)
expect(taskFlow.doneTtl).toBe(TTLs.doneTtl)
})

it('should set key', () => {
const toKey = () => 'key'
const result = taskFlow.key(toKey)
expect(result).toBe(taskFlow)
expect(taskFlow.toKeyFn).toBe(toKey)
})

it('should set handle', () => {
const handler = vi.fn()
const result = taskFlow.handle(handler)
expect(result).toBe(taskFlow)
expect(taskFlow.handleFn).toBe(handler)
})

it('should set success', () => {
const onSuccess = vi.fn()
const result = taskFlow.success(onSuccess)
expect(result).toBe(taskFlow)
expect(taskFlow.successFn).toBe(onSuccess)
})

it('should set fail', () => {
const onFail = vi.fn()
const result = taskFlow.fail(onFail)
expect(result).toBe(taskFlow)
expect(taskFlow.failFn).toBe(onFail)
})

it('should get another EventFlow', () => {
const result = taskFlow.another()
expect(result).toBe(parentFlow)
})

it('should assemble the TaskVerse', () => {
const taskVerse = taskFlow.assemble()
expect(taskVerse).toBeDefined()
expect(taskVerse.flow).toBe(taskFlow)
})
})
4 changes: 0 additions & 4 deletions packages/orap/src/flow/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ export class TaskFlow implements Flow {
return this.parentFlow!
}

get logger() {
return this.parentFlow!.logger
}

assemble(): TaskVerse {
return new TaskVerse(this)
}
Expand Down
Loading