Skip to content

Commit

Permalink
make system subscribable, and use snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
cevr committed Jan 25, 2024
1 parent 69fe023 commit afb5807
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 111 deletions.
6 changes: 2 additions & 4 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@ export { and, not, or, stateIn } from './guards.ts';
export { setup } from './setup.ts';
export type {
ActorSystem,
SystemSnapshot,
InspectedActorEvent,
InspectedEventEvent,
InspectedSnapshotEvent,
InspectionEvent,
RegisteredActorEvent,
UnregisteredActorEvent,
RegistrationEvent
InspectionEvent
} from './system.ts';
export { toPromise } from './toPromise.ts';
export {
Expand Down
124 changes: 50 additions & 74 deletions packages/core/src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ function createScheduledEventId(
}

export interface ActorSystem<T extends ActorSystemInfo>
extends Subscribable<
RegistrationEvent<
T['actors'][keyof T['actors']],
keyof T['actors'] & string
>
> {
extends Subscribable<SystemSnapshot> {
/**
* @internal
*/
Expand Down Expand Up @@ -87,15 +82,15 @@ export interface ActorSystem<T extends ActorSystemInfo>
event: AnyEventObject
) => void;
scheduler: Scheduler;
getSnapshot: () => {
_scheduledEvents: Record<string, ScheduledEvent>;
};
getSnapshot: () => SystemSnapshot;
/**
* @internal
*/
_snapshot: {
_scheduledEvents: Record<ScheduledEventId, ScheduledEvent>;
};
_updateSnapshot: (snapshot: SystemSnapshot) => void;
/**
* @internal
*/
_snapshot: SystemSnapshot;
start: () => void;
}

Expand All @@ -113,9 +108,7 @@ export function createSystem<T extends ActorSystemInfo>(
const keyedActors = new Map<keyof T['actors'], AnyActorRef | undefined>();
const reverseKeyedActors = new WeakMap<AnyActorRef, keyof T['actors']>();
const inspectionObservers = new Set<Observer<InspectionEvent>>();
const registrationObservers = new Set<
Observer<RegistrationEvent<T['actors'][keyof T['actors']]>>
>();
const systemObservers = new Set<Observer<SystemSnapshot>>();
const timerMap: { [id: ScheduledEventId]: number } = {};
const clock = options.clock;

Expand Down Expand Up @@ -169,23 +162,36 @@ export function createSystem<T extends ActorSystemInfo>(
}
};

function makeActorsFromChildren() {
const actors = {} as Record<keyof T['actors'], AnyActorRef>;
for (const [key, actorRef] of children) {
const systemId = reverseKeyedActors.get(actorRef);
if (systemId !== undefined) {
actors[systemId] = actorRef;
}
}
return actors;
}

const system: ActorSystem<T> = {
_snapshot: {
_scheduledEvents:
(options?.snapshot && (options.snapshot as any).scheduler) ?? {}
(options?.snapshot && (options.snapshot as any).scheduler) ?? {},
actors: makeActorsFromChildren()
},

_bookId: () => `x:${idCounter++}`,
_register: (sessionId, actorRef) => {
children.set(sessionId, actorRef);
const systemId = reverseKeyedActors.get(actorRef);
if (systemId !== undefined) {
const event = {
type: `@xstate.actor.register`,
systemId: systemId as string,
actorRef: actorRef as T['actors'][keyof T['actors']]
} as const;
registrationObservers.forEach((listener) => {
listener.next?.(event);
const currentSnapshot = system.getSnapshot();
system._updateSnapshot({
...currentSnapshot,
actors: {
...currentSnapshot.actors,
[systemId]: actorRef
}
});
}
return sessionId;
Expand All @@ -197,13 +203,13 @@ export function createSystem<T extends ActorSystemInfo>(
if (systemId !== undefined) {
keyedActors.delete(systemId);
reverseKeyedActors.delete(actorRef);
const event = {
type: `@xstate.actor.unregister`,
systemId: systemId as string,
actorRef: actorRef as T['actors'][keyof T['actors']]
} as const;
registrationObservers.forEach((listener) => {
listener.next?.(event);
const {
_scheduledEvents,
actors: { [systemId]: _, ...actors }
} = system.getSnapshot();
system._updateSnapshot({
_scheduledEvents,
actors
});
}
},
Expand All @@ -212,8 +218,8 @@ export function createSystem<T extends ActorSystemInfo>(
},
subscribe: (
nextListenerOrObserver:
| ((event: RegistrationEvent<T['actors'][keyof T['actors']]>) => void)
| Observer<RegistrationEvent<T['actors'][keyof T['actors']]>>,
| ((event: SystemSnapshot) => void)
| Observer<SystemSnapshot>,
errorListener?: (error: any) => void,
completeListener?: () => void
) => {
Expand All @@ -223,25 +229,11 @@ export function createSystem<T extends ActorSystemInfo>(
completeListener
);

if (rootActor._processingStatus !== ProcessingStatus.Stopped) {
registrationObservers.add(observer);
} else {
const snapshot = rootActor.getSnapshot();
switch (snapshot.status) {
case 'done':
try {
observer.complete?.();
} catch (err) {
reportUnhandledError(err);
}
break;
// can this error?
}
}
systemObservers.add(observer);

return {
unsubscribe: () => {
registrationObservers.delete(observer);
systemObservers.delete(observer);
}
};
},
Expand Down Expand Up @@ -280,9 +272,13 @@ export function createSystem<T extends ActorSystemInfo>(
},
scheduler,
getSnapshot: () => {
return {
_scheduledEvents: { ...system._snapshot._scheduledEvents }
};
return system._snapshot;
},
_updateSnapshot: (snapshot) => {
system._snapshot = snapshot;
systemObservers.forEach((listener) => {
listener.next?.(snapshot);
});
},
start: () => {
const scheduledEvets = system._snapshot._scheduledEvents;
Expand Down Expand Up @@ -332,27 +328,7 @@ export type InspectionEvent =
| InspectedEventEvent
| InspectedActorEvent;

export interface RegisteredActorEvent<
TActorRef extends AnyActorRef,
TSystemId extends string = string
> {
type: `@xstate.actor.register`;
systemId: TSystemId;
actorRef: TActorRef;
export interface SystemSnapshot {
_scheduledEvents: Record<ScheduledEventId, ScheduledEvent>;
actors: Record<string, AnyActorRef>;
}

export interface UnregisteredActorEvent<
TActorRef extends AnyActorRef,
TSystemId extends string = string
> {
type: `@xstate.actor.unregister`;
systemId: TSystemId;
actorRef: TActorRef;
}

export type RegistrationEvent<
TActorRef extends AnyActorRef = AnyActorRef,
TSystemId extends string = string
> =
| RegisteredActorEvent<TActorRef, TSystemId>
| UnregisteredActorEvent<TActorRef, TSystemId>;
62 changes: 29 additions & 33 deletions packages/core/test/system.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,23 +565,23 @@ describe('system', () => {

const actorRef = createActor(machine).start();

const events: string[] = [];
let keysOverTime: string[][] = [
Object.keys(actorRef.system.getSnapshot().actors)
];

actorRef.system.subscribe((event) => {
events.push(`${event.type}.${event.systemId}`);
actorRef.system.subscribe((snapshot) => {
keysOverTime.push(Object.keys(snapshot.actors));
});

actorRef.send({ type: 'to_b' });
expect(events).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`
]);
expect(keysOverTime).toEqual([['a_child'], [], ['b_child']]);
actorRef.send({ type: 'to_a' });
expect(events).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`,
`@xstate.actor.unregister.${bSystemId}`,
`@xstate.actor.register.${aSystemId}`
expect(keysOverTime).toEqual([
['a_child'],
[],
['b_child'],
[],
['a_child']
]);
});

Expand Down Expand Up @@ -614,39 +614,35 @@ describe('system', () => {

const actorRef = createActor(machine).start();

const events: string[] = [];
const unsubscribedEvents: string[] = [];
const keysOverTime: string[][] = [
Object.keys(actorRef.system.getSnapshot().actors)
];
const unsubscribedKeysOverTime: string[][] = [
Object.keys(actorRef.system.getSnapshot().actors)
];

const subscription = actorRef.system.subscribe((event) => {
events.push(`${event.type}.${event.systemId}`);
keysOverTime.push(Object.keys(event.actors));
});

actorRef.system.subscribe((event) => {
unsubscribedEvents.push(`${event.type}.${event.systemId}`);
unsubscribedKeysOverTime.push(Object.keys(event.actors));
});

actorRef.send({ type: 'to_b' });
expect(events).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`
]);
expect(unsubscribedEvents).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`
]);
expect(keysOverTime).toEqual([['a_child'], [], ['b_child']]);
expect(unsubscribedKeysOverTime).toEqual([['a_child'], [], ['b_child']]);

subscription.unsubscribe();
actorRef.send({ type: 'to_a' });

expect(events).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`
]);
expect(unsubscribedEvents).toEqual([
`@xstate.actor.unregister.${aSystemId}`,
`@xstate.actor.register.${bSystemId}`,
`@xstate.actor.unregister.${bSystemId}`,
`@xstate.actor.register.${aSystemId}`
expect(keysOverTime).toEqual([['a_child'], [], ['b_child']]);
expect(unsubscribedKeysOverTime).toEqual([
['a_child'],
[],
['b_child'],
[],
['a_child']
]);
});
});

0 comments on commit afb5807

Please sign in to comment.