Skip to content

Commit a1d303c

Browse files
committed
feat: 🎸 improve pull logic
1 parent 636084a commit a1d303c

File tree

5 files changed

+140
-65
lines changed

5 files changed

+140
-65
lines changed

src/json-crdt-repo/local/level/LevelLocalRepo.ts

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import {BehaviorSubject, Observable, Subject, type Subscription} from 'rxjs';
1+
import {BehaviorSubject, Observable, type Subscription} from 'rxjs';
22
import {filter, finalize, map, switchMap} from 'rxjs/operators';
33
import {gzip, ungzip} from '@jsonjoy.com/util/lib/compression/gzip';
44
import {Writer} from '@jsonjoy.com/util/lib/buffers/Writer';
@@ -10,8 +10,8 @@ import {once} from 'thingies/lib/once';
1010
import {timeout} from 'thingies/lib/timeout';
1111
import {pubsub} from '../../pubsub';
1212
import type {RemoteBatch, ServerHistory, ServerPatch} from '../../remote/types';
13-
import type {BlockId, LocalRepo, LocalRepoChangeEvent, LocalRepoDeleteEvent, LocalRepoMergeEvent, LocalRepoRebaseEvent, LocalRepoResetEvent, LocalRepoSyncRequest, LocalRepoSyncResponse} from '../types';
14-
import type {BinStrLevel, BinStrLevelOperation, BlockMetaValue, BlockModelMetadata, BlockModelValue, LevelLocalRepoPubSubMessageLocalRebase, LevelLocalRepoPubSubMessageRemoteMerge, LevelLocalRepoRemotePull, LevelLocalRepoPubSubMessageRemoteReset, LocalBatch, SyncResult, LevelLocalRepoPubSubMessage, LevelLocalRepoPubSub} from './types';
13+
import type {BlockId, LocalRepo, LocalRepoEvent, LocalRepoDeleteEvent, LocalRepoMergeEvent, LocalRepoRebaseEvent, LocalRepoResetEvent, LocalRepoSyncRequest, LocalRepoSyncResponse} from '../types';
14+
import type {BinStrLevel, BinStrLevelOperation, BlockMetaValue, BlockModelMetadata, BlockModelValue, LocalBatch, SyncResult, LevelLocalRepoPubSub} from './types';
1515
import type {CrudLocalRepoCipher} from './types';
1616
import type {Locks} from 'thingies/lib/Locks';
1717
import type {JsonValueCodec} from '@jsonjoy.com/json-pack/lib/codecs/types';
@@ -418,9 +418,9 @@ export class LevelLocalRepo implements LocalRepo {
418418

419419
}
420420

421-
private _remoteSubs: Record<string, Observable<LocalRepoChangeEvent>> = {};
421+
private _remoteSubs: Record<string, Observable<LocalRepoEvent>> = {};
422422

423-
protected subscribeToRemoteChanges(id: BlockId): Observable<LocalRepoChangeEvent> {
423+
protected subscribeToRemoteChanges(id: BlockId): Observable<LocalRepoEvent> {
424424
const blockId = id.join('/');
425425
let sub = this._remoteSubs[blockId];
426426
if (sub) return sub;
@@ -678,12 +678,12 @@ export class LevelLocalRepo implements LocalRepo {
678678
assertTimeout();
679679
await this.kv.batch(ops);
680680
if (pull) {
681-
const data: LevelLocalRepoRemotePull = {
682-
id,
683-
batch,
684-
batches: pull.batches,
685-
snapshot: pull.snapshot
686-
};
681+
// const data: LevelLocalRepoRemotePull = {
682+
// id,
683+
// batch,
684+
// batches: pull.batches,
685+
// snapshot: pull.snapshot
686+
// };
687687
// TODO: Emit something here...
688688
// this.pubsub.pub(['pull', data]);
689689
}
@@ -818,7 +818,7 @@ export class LevelLocalRepo implements LocalRepo {
818818
}
819819
}
820820

821-
public change$(id: BlockId): Observable<LocalRepoChangeEvent> {
821+
public change$(id: BlockId): Observable<LocalRepoEvent> {
822822
return this.pubsub.bus$.pipe(
823823
map((msg) => {
824824
switch (msg.type) {
@@ -865,7 +865,7 @@ export class LevelLocalRepo implements LocalRepo {
865865
}
866866
}
867867
}),
868-
filter((event): event is LocalRepoChangeEvent => !!event),
868+
filter((event): event is LocalRepoEvent => !!event),
869869
);
870870
}
871871
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import {Model, Patch, s} from 'json-joy/lib/json-crdt';
2+
import {setup} from './setup';
3+
import {firstValueFrom, ReplaySubject} from 'rxjs';
4+
import {LocalRepo, LocalRepoMergeEvent, LocalRepoResetEvent} from '../../types';
5+
6+
const get = async (kit: Awaited<ReturnType<typeof setup>>, id = kit.blockId): Promise<Model> => {
7+
const {block} = await kit.remote.client.call('block.get', {id: id.join('/')});
8+
const model = Model.load(block.snapshot.blob);
9+
for (const batch of block.tip)
10+
for (const patch of batch.patches)
11+
model.applyPatch(Patch.fromBinary(patch.blob));
12+
return model;
13+
};
14+
15+
describe('.pull()', () => {
16+
describe('new block', () => {
17+
test('can read a new block', async () => {
18+
const kit = await setup();
19+
const schema = s.obj({foo: s.str('bar')});
20+
const model = Model.create(schema, kit.sid);
21+
const patches = [model.api.flush()];
22+
await kit.remote.client.call('block.new', {
23+
id: kit.blockId.join('/'),
24+
batch: {
25+
patches: [{
26+
blob: patches[0].toBinary(),
27+
}],
28+
}
29+
});
30+
await kit.local.pull(kit.blockId);
31+
const get = await kit.local.get(kit.blockId);
32+
expect(get.model.view()).toEqual({foo: 'bar'});
33+
await kit.stop();
34+
});
35+
36+
test('emits "reset" event', async () => {
37+
const kit = await setup();
38+
const schema = s.obj({foo: s.str('bar')});
39+
const model = Model.create(schema, kit.sid);
40+
const patches = [model.api.flush()];
41+
await kit.remote.client.call('block.new', {
42+
id: kit.blockId.join('/'),
43+
batch: {
44+
patches: [{
45+
blob: patches[0].toBinary(),
46+
}],
47+
}
48+
});
49+
const events$ = new ReplaySubject<LocalRepoResetEvent>(1);
50+
let cnt = 0;
51+
kit.local.change$(kit.blockId).subscribe((event) => {
52+
if (!(event as LocalRepoResetEvent).reset) return;
53+
events$.next(event as LocalRepoResetEvent);
54+
cnt++;
55+
});
56+
await kit.local.pull(kit.blockId);
57+
const event = await firstValueFrom(events$);
58+
expect(cnt).toBe(1);
59+
expect(event.reset.view()).toEqual({foo: 'bar'});
60+
await kit.stop();
61+
});
62+
});
63+
64+
describe('existing block', () => {
65+
test.skip('does nothing if block is up to date', async () => {
66+
// TODO: Implement this test.
67+
});
68+
69+
test.skip('handles case when another thread synchronized the block ahead of time', async () => {
70+
// TODO: Implement this test.
71+
});
72+
73+
test('catches up using "merge" strategy', async () => {
74+
const kit = await setup();
75+
const schema = s.obj({foo: s.str('bar')});
76+
const model = Model.create(schema, kit.sid);
77+
const patches = [model.api.flush()];
78+
const local: LocalRepo = kit.local;
79+
const res = await local.sync({id: kit.blockId, patches});
80+
await res.remote;
81+
const model2 = await get(kit);
82+
expect(model2.view()).toEqual({foo: 'bar'});
83+
model2.api.obj([]).set({foo: 'baz'});
84+
await kit.remote.client.call('block.upd', {
85+
id: kit.blockId.join('/'),
86+
batch: {
87+
patches: [{
88+
blob: model2.api.flush().toBinary(),
89+
}],
90+
},
91+
});
92+
model2.api.obj([]).set({x: 1});
93+
await kit.remote.client.call('block.upd', {
94+
id: kit.blockId.join('/'),
95+
batch: {
96+
patches: [{
97+
blob: model2.api.flush().toBinary(),
98+
}],
99+
},
100+
});
101+
const model3 = await get(kit);
102+
expect(model3.view()).toEqual({foo: 'baz', x: 1});
103+
const events$ = new ReplaySubject<LocalRepoMergeEvent>(1);
104+
let cnt = 0;
105+
kit.local.change$(kit.blockId).subscribe((event) => {
106+
if (!(event as LocalRepoMergeEvent).merge) return;
107+
events$.next(event as LocalRepoMergeEvent);
108+
cnt++;
109+
});
110+
await kit.local.pull(kit.blockId);
111+
const event = await firstValueFrom(events$);
112+
expect(cnt).toBe(1);
113+
expect(model.view()).toEqual({foo: 'bar'});
114+
for (const patch of event.merge) model.applyPatch(patch);
115+
expect(model.view()).toEqual({foo: 'baz', x: 1});
116+
const read = await kit.local.get(kit.blockId);
117+
expect(read.model.view()).toEqual({foo: 'baz', x: 1});
118+
await kit.stop();
119+
});
120+
});
121+
});

src/json-crdt-repo/local/level/__tests__/LevelLocalRepo.spec.ts

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -174,50 +174,4 @@ describe('.sync()', () => {
174174

175175
test.todo('can read block from remote, but create one locally in the meantime');
176176
});
177-
178-
describe('.pull()', () => {
179-
describe('new block', () => {
180-
test('can read a new block', async () => {
181-
const kit = await setup();
182-
const schema = s.obj({foo: s.str('bar')});
183-
const model = Model.create(schema, kit.sid);
184-
const patches = [model.api.flush()];
185-
await kit.remote.client.call('block.new', {
186-
id: kit.blockId.join('/'),
187-
batch: {
188-
patches: [{
189-
blob: patches[0].toBinary(),
190-
}],
191-
}
192-
});
193-
await kit.local.pull(kit.blockId);
194-
const get = await kit.local.get(kit.blockId);
195-
expect(get.model.view()).toEqual({foo: 'bar'});
196-
await kit.stop();
197-
});
198-
199-
test.only('emits "reset" event', async () => {
200-
const kit = await setup();
201-
const schema = s.obj({foo: s.str('bar')});
202-
const model = Model.create(schema, kit.sid);
203-
const patches = [model.api.flush()];
204-
await kit.remote.client.call('block.new', {
205-
id: kit.blockId.join('/'),
206-
batch: {
207-
patches: [{
208-
blob: patches[0].toBinary(),
209-
}],
210-
}
211-
});
212-
kit.local.change$(kit.blockId).subscribe((event) => {
213-
console.log(event);
214-
});
215-
await kit.local.pull(kit.blockId);
216-
217-
218-
219-
await kit.stop();
220-
});
221-
});
222-
});
223177
});

src/json-crdt-repo/local/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ export interface LocalRepo {
3838
*
3939
* @param id Unique ID of the block.
4040
*/
41-
change$(id: BlockId): Observable<LocalRepoChangeEvent>;
41+
change$(id: BlockId): Observable<LocalRepoEvent>;
4242
}
4343

4444
/**
@@ -100,7 +100,7 @@ export interface LocalRepoSyncResponse {
100100
* the local client or by a remote client. It contains various types of changes
101101
* that can be applied to the local editing session.
102102
*/
103-
export type LocalRepoChangeEvent =
103+
export type LocalRepoEvent =
104104
| LocalRepoMergeEvent
105105
| LocalRepoRebaseEvent
106106
| LocalRepoResetEvent

src/json-crdt-repo/session/EditSession.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {Log} from 'json-joy/lib/json-crdt/log/Log';
22
import {Model, Patch} from 'json-joy/lib/json-crdt';
33
import {Subject, takeUntil} from 'rxjs';
4-
import type {BlockId, LocalRepo, LocalRepoChangeEvent, LocalRepoMergeEvent, LocalRepoRebaseEvent, LocalRepoResetEvent} from '../local/types';
4+
import type {BlockId, LocalRepo, LocalRepoEvent, LocalRepoMergeEvent, LocalRepoRebaseEvent, LocalRepoResetEvent} from '../local/types';
55

66
export class EditSession {
77
public log: Log;
@@ -26,9 +26,9 @@ export class EditSession {
2626
this._stop$.next();
2727
}
2828

29-
private events: LocalRepoChangeEvent[] = [];
29+
private events: LocalRepoEvent[] = [];
3030

31-
private onEvent = (event: LocalRepoChangeEvent): void => {
31+
private onEvent = (event: LocalRepoEvent): void => {
3232
this.events.push(event);
3333
this.drainEvents();
3434
};
@@ -41,7 +41,7 @@ export class EditSession {
4141
this.events = [];
4242
}
4343

44-
private processChange(event: LocalRepoChangeEvent): void {
44+
private processChange(event: LocalRepoEvent): void {
4545
if ((event as LocalRepoResetEvent).reset) this.reset((event as LocalRepoResetEvent).reset);
4646
else if ((event as LocalRepoRebaseEvent).rebase) this.rebase((event as LocalRepoRebaseEvent).rebase);
4747
else if ((event as LocalRepoMergeEvent).merge) this.merge((event as LocalRepoMergeEvent).merge);

0 commit comments

Comments
 (0)