Skip to content

Commit

Permalink
fix(remesh): fix the logic of garbage collection in domain.event and …
Browse files Browse the repository at this point in the history
…domain.query
  • Loading branch information
Lucifier129 committed Apr 1, 2024
1 parent 8d93b6d commit 73253a5
Showing 1 changed file with 89 additions and 46 deletions.
135 changes: 89 additions & 46 deletions packages/remesh/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ export type RemeshQueryStorage<T extends Args<Serializable>, U> = {
upstreamSet: Set<RemeshQueryStorage<any, any> | RemeshStateStorage<any>>
downstreamSet: Set<RemeshQueryStorage<any, any>>
subject: Subject<U>
observable: Observable<U>
refCount: number
innerRefCount: number
outerRefCount: number
status: 'default' | 'wip' | 'updated'
wipUpstreamSet: Set<RemeshQueryStorage<any, any> | RemeshStateStorage<any>>
}
Expand All @@ -78,7 +78,8 @@ export type RemeshEventStorage<T extends Args, U> = {
Event: RemeshEvent<T, U>
subject: Subject<T[0]>
observable: Observable<U>
refCount: number
innerRefCount: number
outerRefCount: number
}

export type RemeshDomainStorage<T extends RemeshDomainDefinition, U extends Args<Serializable>> = {
Expand Down Expand Up @@ -300,15 +301,12 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
}),
)

const observable = new Observable<U>((subscriber) => {
const subscription = observableMapToImplIfNeeded.subscribe(subscriber)
eventStorage.refCount += 1
const observable = new Observable<U>((observer) => {
const subscription = observableMapToImplIfNeeded.subscribe(observer)

return () => {
eventStorage.refCount -= 1
subscription.unsubscribe()
if (eventStorage.refCount === 0) {
clearEventStorageIfNeeded(eventStorage)
}
clearEventStorageIfNeeded(eventStorage)
}
})

Expand All @@ -320,7 +318,8 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
Event,
subject,
observable,
refCount: 0,
innerRefCount: 0,
outerRefCount: 0,
} as RemeshEventStorage<T, U>)

domainStorage.eventMap.set(Event, eventStorage)
Expand All @@ -345,7 +344,7 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
const key = getQueryStorageKey(queryAction)

const subject = new Subject<U>()
const observable = subject.asObservable()

const upstreamSet: RemeshQueryStorage<T, U>['upstreamSet'] = new Set()

const currentQueryStorage = {
Expand All @@ -357,8 +356,8 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
upstreamSet,
downstreamSet: new Set(),
subject,
observable,
refCount: 0,
innerRefCount: 0,
outerRefCount: 0,
status: 'default',
wipUpstreamSet: new Set(),
} as RemeshQueryStorage<T, U>
Expand Down Expand Up @@ -413,11 +412,9 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
}

const subject = new Subject<U>()
const observable = subject.asObservable()

queryStorage.status = 'default'
queryStorage.subject = subject
queryStorage.observable = observable
domainStorage.queryMap.set(queryStorage.key, queryStorage)

for (const upstream of queryStorage.upstreamSet) {
Expand Down Expand Up @@ -635,8 +632,16 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
const shouldClearQueryStorage = <T extends Args<Serializable>, U>(
queryStorage: RemeshQueryStorage<T, U>,
): boolean => {
if (queryStorage.refCount > 0) {
return false
const domainStorage = getDomainStorage(queryStorage.Query.owner)

if (domainStorage.refCount > 0) {
if (queryStorage.innerRefCount > 0 || queryStorage.outerRefCount > 0) {
return false
}
} else {
if (queryStorage.outerRefCount > 0) {
return false
}
}

if (queryStorage.downstreamSet.size !== 0) {
Expand Down Expand Up @@ -674,9 +679,18 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
}

const shouldClearEventStorage = <T extends Args, U>(eventStorage: RemeshEventStorage<T, U>): boolean => {
if (eventStorage.refCount > 0) {
return false
const domainStorage = getDomainStorage(eventStorage.Event.owner)

if (domainStorage.refCount > 0) {
if (eventStorage.innerRefCount > 0 || eventStorage.outerRefCount > 0) {
return false
}
} else {
if (eventStorage.outerRefCount > 0) {
return false
}
}

return true
}

Expand Down Expand Up @@ -752,13 +766,13 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
* so the domain resources can be cleared
*/
for (const queryStorage of domainStorage.queryMap.values()) {
if (queryStorage.refCount > 0) {
if (queryStorage.outerRefCount > 0) {
return false
}
}

for (const eventStorage of domainStorage.eventMap.values()) {
if (eventStorage.refCount > 0) {
if (eventStorage.outerRefCount > 0) {
return false
}
}
Expand Down Expand Up @@ -807,20 +821,41 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
throw new Error(`Unexpected input in ctx.get(..): ${input}`)
},
fromEvent: (Event) => {
if (Event.type === 'RemeshEvent') {
const eventStorage = getEventStorage(Event)
return eventStorage.observable
} else if (Event.type === 'RemeshSubscribeOnlyEvent') {
const OriginalEvent = internalToOriginalEvent(Event)
const eventStorage = getEventStorage(OriginalEvent)
return eventStorage.observable
}
const eventStorage =
Event.type === 'RemeshSubscribeOnlyEvent'
? getEventStorage(internalToOriginalEvent(Event))
: getEventStorage(Event)

const observable: Observable<any> = new Observable((subscriber) => {
const subscription = eventStorage.observable.subscribe(subscriber)

eventStorage.innerRefCount += 1

return () => {
eventStorage.innerRefCount -= 1
subscription.unsubscribe()
clearEventStorageIfNeeded(eventStorage)
}
})

throw new Error(`Unexpected input in fromEvent(..): ${Event}`)
return observable
},
fromQuery: (queryAction) => {
const queryStorage = getQueryStorage(queryAction)
return queryStorage.observable

const observable: Observable<any> = new Observable((subscriber) => {
const subscription = queryStorage.subject.subscribe(subscriber)

queryStorage.innerRefCount += 1

return () => {
queryStorage.innerRefCount -= 1
subscription.unsubscribe()
clearQueryStorageIfNeeded(queryStorage)
}
})

return observable
},
}

Expand Down Expand Up @@ -1145,16 +1180,15 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
): Subscription => {
const queryStorage = getQueryStorage(queryAction)

const observable = new Observable<U>((subscriber) => {
const observable: Observable<U> = new Observable((subscriber) => {
const subscription = queryStorage.subject.subscribe(subscriber)
queryStorage.refCount += 1

queryStorage.outerRefCount += 1

return () => {
queryStorage.refCount -= 1
queryStorage.outerRefCount -= 1
subscription.unsubscribe()
if (queryStorage.refCount === 0) {
clearQueryStorageIfNeeded(queryStorage)
}
clearQueryStorageIfNeeded(queryStorage)
}
})

Expand All @@ -1169,15 +1203,24 @@ export const RemeshStore = (options?: RemeshStoreOptions) => {
Event: RemeshEvent<T, U> | RemeshSubscribeOnlyEvent<T, U>,
subscriber: (event: U) => unknown,
): Subscription => {
if (Event.type === 'RemeshEvent') {
const eventStorage = getEventStorage(Event)
return eventStorage.observable.subscribe(subscriber)
} else if (Event.type === 'RemeshSubscribeOnlyEvent') {
const OriginalEvent = internalToOriginalEvent(Event)
return subscribeEvent(OriginalEvent, subscriber)
}
const eventStorage =
Event.type === 'RemeshSubscribeOnlyEvent'
? getEventStorage(internalToOriginalEvent(Event))
: getEventStorage(Event)

const observable = new Observable<U>((subscriber) => {
const subscription = eventStorage.observable.subscribe(subscriber)

throw new Error(`Unknown event type of ${Event}`)
eventStorage.outerRefCount += 1

return () => {
eventStorage.outerRefCount -= 1
subscription.unsubscribe()
clearEventStorageIfNeeded(eventStorage)
}
})

return observable.subscribe(subscriber)
}

const getDomain = <T extends RemeshDomainDefinition, U extends Args<Serializable>>(
Expand Down

0 comments on commit 73253a5

Please sign in to comment.