-
Notifications
You must be signed in to change notification settings - Fork 639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Does "applyPatch" suppose to fire "onPatch"? #586
Comments
applyPatch
suppose to fire onPatch
?
Yeah, it is supposed, as they may be other "onPatch" interested in the replayed value. We recommend to use a scoped boolean as semaphore to let the code know if in reapplying patches :)
|
@mattiamanzati: The problem with scoped booleans is that it will ignore any patches that are generated as a side effect of applying the patch (e.g. applying the patch causes a component to unmount, which in turn saves some state and thus generates another patch as a side effect of the first one). I'm currently working on a project that synchronizes state across multiple devices (similar set of concerns as #1263 ), and preventing clients/server from re-emitting patches that they receive is turning out to be a hassle. This is really a shame given how fantastic a fit MST is for the problem, and how common I imagine this use case is. Is it possible to add a (possibly optional) flag to applyPatch to toggle this behavior? Something along the lines of:
|
@Ghirigoro Did you ever figure out a solution for this, perhaps using the filter in |
@BrianHung: I have no idea what It's been a while since I've looked at that code and in general I've drifted away from MST, so careful with the following code but what follows below is the general approach I've taken in the past to synchronizing patches. At a high level, the idea is that you keep track of "external" patches (e.g. ones that are coming from other instances of your model) and "internal" patches (i.e. ones that have been generated within your instance and need to be propagated out). ////////////////////////////////////////////////////////////////////////////
// INCOMING PATCHES
////////////////////////////////////////////////////////////////////////////
// This tracks the patches that are incoming externally. When we apply a patch
// we only want apply it if it IS incoming, and when figuring out what patches
// to send we only want to send patches that are NOT incoming.
// NOTE: Basically all this code is just implementing a set. JS sets don't permit
// you to define equality (which we need) so we have to do it ourselves.
let incomingPatches: IJsonPatch[] | null = null;
const arePatchesEqual = (a: IJsonPatch, b: IJsonPatch): boolean => {
// We need to do a deep equality check here.
// THIS IS NOT A GOOD IMPLEMENTATION OF THAT.
// You'll want to use something like lodash's isEqual, or some other deep equality
// implementation.
// If all this seems like a lot of wasted processing, it's because it is ;)
// You could also add a mechanism to give each patch a unique ID and then use
// that to compare them. That's out of scope for this example though.
return JSON.stringify(a) === JSON.stringify(b);
}
const isIncoming = (p: IJsonPatch): boolean => {
if (incomingPatches === null) return false;
for (let i = 0; i < incomingPatches.length; i++) {
if (arePatchesEqual(incomingPatches[i], p)) return true;
}
return false;
};
////////////////////////////////////////////////////////////////////////////
// OUTGOING PATCHES
////////////////////////////////////////////////////////////////////////////
// This tracks the patches that are outgoing.
const outgoingPatches: IJsonPatch[] = [];
const sendPatches = (patches:IJsonPatch[])=> {
if (patches.length === 0) return;
// TODO: Whatever it is you do to send patches...
}
////////////////////////////////////////////////////////////////////////////
// STORE
////////////////////////////////////////////////////////////////////////////
// NOTE: Store is your MST model
const createSyncedStore = (snap: StoreSnapOut): Store => {
const store = StoreModel.create(snap);
// When we receive a patch from on patch, we register it to be processed
// UNLESS it is an incoming patch (i.e. from an external source). If it is
// then we can ignore it.
const patchDisposer = onPatch(store, (patch: IJsonPatch) => {
if (!isIncoming(patch)) outgoingPatches.push(patch);
});
// Snapshotting is how we detect changes. Snapshots are a kind of
// transaction so we we can detect when ALL the changes have been applied.
// Then we can figure out what patches to send back out...
const snapDisposer = onSnapshot(store, () => {
// If there were changes that triggered this snapshot,
// but there are no outgoing patches it means that the
// changes were all external and we don't need to send
// them back out.
if (outgoingPatches.length === 0) return;
// If there are outgoing patches, it means there were at
// least some internal changes that we want to synchronize.
sendPatches(outgoingPatches);
// Now that we've sent the patches, we can clear them out.
outgoingPatches.length = 0;
});
addDisposer(store, () => {
snapDisposer();
patchDisposer();
});
return store;
};
////////////////////////////////////////////////////////////////////////////
// NETWORK PATCHES
////////////////////////////////////////////////////////////////////////////
// This is how we apply patches that we receive from the network.
const handleExternalPatch = (store: Store, patches: IJsonPatch[]) => {
// Set the incoming patches so that we can ignore them in onPatch.
incomingPatches = patches;
// Apply the patches to the store. This should apply all the patches
// even transitive ones that were generated by the patch, and then
// trigger the onSnapshot handler.
applyPatch(store, patches);
// Clear the incoming patches now that we're done
incomingPatches = null;
}; |
After thinking about this problem a bit more, I've built a Radix Trie to filter and compress patches The TLDR is instead of checking if every new patch is equal to a patch being applied, we check if it exists in a tree by its path. We also check if a patch is a subset of a patch being applied. Here's how I'm using it to sync patches function isRootApplyPatches(call): boolean {
return call.id === call.rootId && call.name === "@APPLY_PATCHES";
}
const atomicDispatch = createActionTrackingMiddleware2<{
recorder: IPatchRecorder;
patchTrie: PatchRadixTrie | undefined;
}>({
filter: call => {
if (call.env) {
return false;
}
return true;
},
onStart: call => {
const recorder = recordPatches(call.tree, (_patch, _inversePatch, context) => {
// An effect is a patch created indirectly from calling applyPatches.
// To filter for effects, need to dedupe applied patches.
filterEffects: if (isRootApplyPatches(call) && !!context) {
const patch = getImmerPatch(_patch);
const node = call.env!.patchTrie!.find(patch);
if (!node || !node.patch) break filterEffects;
const patchComparator: Record<string, (a: Patch, b: Patch) => boolean> = {
"@APPLY_SNAPSHOT": isPatchSubset,
"@APPLY_PATCHES": isEqual,
};
if (patchComparator[context.name]?.(node.patch, patch)) return false;
}
return !!context && isActionContextThisOrChildOf(context, call.id);
});
recorder.resume();
// Create a patch radix trie to deduplicate patches when applyPatches.
const patchTrie = isRootApplyPatches(call) ? new PatchRadixTrie(call.args[0].map(getImmerPatch)) : undefined;
call.env = {
recorder,
patchTrie,
};
},
onFinish: (call, error) => {
const { recorder } = call.env!;
recorder.stop();
if (error !== undefined) {
console.warn("Error in atomic dispatch:", error, call);
recorder.undo();
} else {
const { patches, reversedInversePatches: reverse } = recorder;
// Ignore actions that resulted in no-op.
if (patches.length === 0) return;
const writePatches = this.replicache?.mutate.writePatches;
writePatches?.(patches);
// Exclude applyPatches(state, patches) from undo stack.
// TODO: Create MST-based undo manager.
if (isRootApplyPatches(call)) return;
// Checks if action purely affects client state.
const isOnlyClientUpdate =
getType(call.context) === Client ||
hasParentOfType(call.context, Client) ||
patches.every(patch => patch.path.startsWith("/clients/"));
const docChanged = !isOnlyClientUpdate;
if (docChanged) {
this.undoManager.add({
// Push patches of a group onto queue to be executed
// in a single transaction.
redo: () => {
this.redoPatches.push(...patches);
},
undo: () => {
this.undoPatches.push(...reverse);
},
});
}
}
call.env = undefined;
},
}); import isEqual from "lodash/isEqual";
import { joinJsonPath, splitJsonPath, type IJsonPatch, isStateTreeNode, getSnapshot } from "mobx-state-tree";
export const getImmerPatch = ({ path, ...patch }: IJsonPatch) => ({ path: splitJsonPath(path), ...patch });
export const getMSTPatch = ({ path, ...patch }: Patch) => ({ path: joinJsonPath(path as string[]), ...patch });
export function isPatchEqual(a: Patch, b: Patch) {
return isEqual(a.path, b.path) && a.op === b.op && isEqual(a.value, b.value);
}
/**
* Finds index at which arrays differ. Returns -1 if they are equal or one contains the other.
*/
function splitIndex(partsA: Patch["path"], partsB: Patch["path"]) {
const maxLength = Math.min(partsA.length, partsB.length);
for (let i = 0; i < maxLength; i++) {
if (partsA[i] !== partsB[i]) {
return i;
}
}
return -1;
}
function getValueByPath(obj: any, path: Patch["path"]): any {
let value = obj;
for (const p of path) {
if (value && typeof value === "object" && p in value) {
value = value[p];
} else {
return undefined; // path not found in value
}
}
return value;
}
/**
* Checks if patch b is a subset of a, which means
* b can be generated as a result of applying a.
*/
export function isPatchSubset(a: Patch, b: Patch) {
if (a.path.length === b.path.length) return false;
const index = splitIndex(a.path, b.path);
if (index !== -1) return false;
if (a.op === "replace" && (b.op === "add" || b.op === "remove")) {
return true;
}
if (a.op === b.op) {
const path = b.path.slice(a.path.length);
const value = getValueByPath(a.value, path);
return isEqual(value, b.value);
}
return false;
} |
After every
applyPatch
onPatch
is fired. I'm trying to sync patches to external server and replay them and it's of course creating infinite loop. The solution for that might be to keep patching history, but then some kind of patch identifier would be useful (is there anything like this)?The text was updated successfully, but these errors were encountered: