Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/__demos__/json-crdt-server/routes/block/methods/upd.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {ResolveType} from 'json-joy/lib/json-type';
import type {RouteDeps, Router, RouterBase} from '../../types';
import {BlockCurRef, BlockIdRef, BlockPatchPartialRef, BlockPatchPartialReturnRef} from '../schema';
import {BlockIdRef, BlockPatchPartialRef, BlockPatchPartialReturnRef} from '../schema';

export const upd =
({t, services}: RouteDeps) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@ export class BlocksServices {
if (!limit || Math.round(limit) !== limit) throw RpcError.badRequest('INVALID_LIMIT');
if (limit > 0) {
min = Number(offset) || 0;
max = min + limit;
max = min + limit - 1;
} else {
max = Number(offset) || 0;
min = max - limit;
min = max - limit + 1;
}
if (min < 0) {
min = 0;
Expand Down
101 changes: 46 additions & 55 deletions src/json-crdt-repo/remote/DemoServerRemoteHistory.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type {Observable} from 'rxjs';
import {CallerToMethods, TypedRpcClient} from '../../common';
import type {Observable} from 'rxjs';
import type {JsonJoyDemoRpcCaller} from '../../__demos__/json-crdt-server';
import type {RemoteHistory, RemoteBlockSnapshot, RemoteBlockPatch, RemoteBlock} from './types';

Expand All @@ -17,84 +17,75 @@ export class DemoServerRemoteHistory
constructor(protected readonly client: TypedRpcClient<Methods>) {}

public async read(id: string): Promise<{block: DemoServerBlock}> {
throw new Error('Method not implemented.');
const res = await this.client.call('block.get', {id});

// return {
// cursor: model.seq,
// model,
// patches: [],
// };
return {
block: {
id: res.block.id,
snapshot: res.block.snapshot,
tip: [],
},
};
}

public async scanFwd(id: string, cursor: Cursor): Promise<{patches: DemoServerPatch[]}> {
throw new Error('Method not implemented.');
// const limit = 100;
// const res = await this.client.call('block.scan', {
// id,
// seq: cursor,
// limit: cursor + limit,
// });
// if (res.patches.length === 0) {
// return {
// cursor,
// patches: [],
// };
// }
// return {
// cursor: res.patches[res.patches.length - 1].seq,
// patches: res.patches,
// };
const limit = 100;
const res = await this.client.call('block.scan', {
id,
cur: cursor + 1,
limit,
});
return res;
}

public async scanBwd(
id: string,
cursor: Cursor,
): Promise<{snapshot: DemoServerSnapshot; patches: DemoServerPatch[]}> {
throw new Error('Method not implemented.');
// const res = await this.client.call('block.scan', {
// id,
// seq: cursor,
// limit: -100,
// model: true,
// });
): Promise<{snapshot?: DemoServerSnapshot; patches: DemoServerPatch[]}> {
if (cursor <= 0) {
return {
patches: [],
};
}
const res = await this.client.call('block.scan', {
id,
cur: 0,
limit: cursor,
});
return {
patches: res.patches,
};
}

public async create(
id: string,
patches: Pick<DemoServerPatch, 'blob'>[],
): Promise<{
block: Omit<DemoServerBlock, 'data' | 'tip'>;
block: Omit<DemoServerBlock, 'data' | 'tip' | 'snapshot'>;
snapshot: Omit<DemoServerSnapshot, 'blob'>;
patches: Omit<DemoServerPatch, 'blob'>[];
}> {
throw new Error('Method not implemented.');
// await this.client.call('block.new', {
// id,
// patches: patches.map((patch) => ({
// blob: patch.blob,
// })),
// });
const res = await this.client.call('block.new', {
id,
patches: patches.map((patch) => ({
blob: patch.blob,
})),
});
return res;
}

public async update(
id: string,
cursor: Cursor,
patches: Pick<DemoServerPatch, 'blob'>[],
): Promise<{patches: Omit<DemoServerPatch, 'blob'>[]}> {
throw new Error('Method not implemented.');
// const res = await this.client.call('block.upd', {
// id,
// patches: patches.map((patch, seq) => ({
// seq,
// created: Date.now(),
// blob: patch.blob,
// })),
// });
// return {
// cursor: res.patches.length ? res.patches[res.patches.length - 1].seq : cursor,
// patches: res.patches,
// };
const res = await this.client.call('block.upd', {
id,
patches: patches.map((patch) => ({
blob: patch.blob,
})),
});
return {
patches: res.patches,
};
}

public async delete(id: string): Promise<void> {
Expand Down
164 changes: 162 additions & 2 deletions src/json-crdt-repo/remote/__tests__/DemoServerRemoteHistory.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {Model} from 'json-joy/lib/json-crdt';
import {buildE2eClient} from '../../../common/testing/buildE2eClient';
import {createCaller} from '../../../__demos__/json-crdt-server/routes';
import {DemoServerRemoteHistory} from '../DemoServerRemoteHistory';
import {SESSION} from 'json-joy/lib/json-crdt-patch/constants';
import {Value} from 'json-joy/lib/json-type-value/Value';

const setup = () => {
const {caller, router} = createCaller();
Expand All @@ -20,7 +22,7 @@ let cnt = 0;
const genId = () => Math.random().toString(36).slice(2) + '-' + Date.now().toString(36) + '-' + cnt++;

describe('.create()', () => {
test.skip('can create a block with a simple patch', async () => {
test('can create a block with a simple patch', async () => {
const {remote, caller} = await setup();
const model = Model.withLogicalClock();
model.api.root({foo: 'bar'});
Expand All @@ -29,8 +31,166 @@ describe('.create()', () => {
const id = genId();
await remote.create(id, [{blob}]);
const {data} = await caller.call('block.get', {id}, {});
// console.log(data.patches);
const model2 = Model.fromBinary(data.block.snapshot.blob);
expect(model2.view()).toEqual({foo: 'bar'});
});

test('can create with empty model', async () => {
const {remote, caller} = await setup();
const id = genId();
await remote.create(id, []);
const {data} = await caller.call('block.get', {id}, {});
const model2 = Model.fromBinary(data.block.snapshot.blob);
expect(model2.view()).toBe(undefined);
});

test('empty model uses global session ID', async () => {
const {remote, caller} = await setup();
const id = genId();
await remote.create(id, []);
const {data} = await caller.call('block.get', {id}, {});
const model2 = Model.fromBinary(data.block.snapshot.blob);
expect(model2.clock.sid).toBe(SESSION.GLOBAL);
});
});

describe('.read()', () => {
test('can read a block with a simple patch', async () => {
const {remote} = await setup();
const model = Model.withLogicalClock();
model.api.root({score: 42});
const patch = model.api.flush();
const blob = patch.toBinary();
const id = genId();
await remote.create(id, [{blob}]);
const read = await remote.read(id);
expect(read).toMatchObject({
block: {
id,
snapshot: {
blob: expect.any(Uint8Array),
cur: 0,
ts: expect.any(Number),
},
tip: [],
},
});
const model2 = Model.fromBinary(read.block.snapshot.blob);
expect(model2.view()).toEqual({score: 42});
});

test('throws NOT_FOUND error on missing block', async () => {
const {remote} = await setup();
const id = genId();
try {
const read = await remote.read(id);
throw new Error('not this error');
} catch (error) {
expect(error).toMatchObject({
message: 'NOT_FOUND',
});
}
});
});

describe('.update()', () => {
test('can apply changes to an empty document', async () => {
const {remote} = await setup();
const id = genId();
await remote.create(id, []);
const read1 = await remote.read(id);
const model1 = Model.fromBinary(read1.block.snapshot.blob);
expect(model1.view()).toBe(undefined);
const model = Model.withLogicalClock();
model.api.root({score: 42});
const patch = model.api.flush();
const blob = patch.toBinary();
const update = await remote.update(id, [{blob}]);
expect(update).toMatchObject({
patches: [
{
ts: expect.any(Number),
},
],
});
const read2 = await remote.read(id);
const model2 = Model.fromBinary(read2.block.snapshot.blob);
expect(model2.view()).toEqual({score: 42});
});
});

describe('.scanFwd()', () => {
test('can scan patches forward', async () => {
const {remote} = await setup();
const id = genId();
const model1 = Model.withLogicalClock();
model1.api.root({score: 42});
const patch1 = model1.api.flush();
const blob = patch1.toBinary();
await remote.create(id, [{blob}]);
const read1 = await remote.read(id);
model1.api.obj([]).set({
foo: 'bar',
});
const patch2 = model1.api.flush();
const blob2 = patch2.toBinary();
await remote.update(id, [{blob: blob2}]);
const scan1 = await remote.scanFwd(id, read1.block.snapshot.cur);
expect(scan1).toMatchObject({
patches: [
{
blob: expect.any(Uint8Array),
ts: expect.any(Number),
},
],
});
expect(scan1.patches[0].blob).toEqual(blob2);
});
});

describe('.scanBwd()', () => {
test('can scan patches backward', async () => {
const {remote} = await setup();
const id = genId();
const model1 = Model.withLogicalClock();
model1.api.root({score: 42});
const patch1 = model1.api.flush();
const blob1 = patch1.toBinary();
await remote.create(id, [{blob: blob1}]);
const read1 = await remote.read(id);
model1.api.obj([]).set({
foo: 'bar',
});
const patch2 = model1.api.flush();
const blob2 = patch2.toBinary();
await remote.update(id, [{blob: blob2}]);
const read2 = await remote.read(id);
const scan1 = await remote.scanBwd(id, read2.block.snapshot.cur);
expect(scan1.patches.length).toBe(1);
expect(scan1).toMatchObject({
patches: [
{
blob: expect.any(Uint8Array),
ts: expect.any(Number),
},
],
});
expect(scan1.patches[0].blob).toEqual(blob1);
});
});

describe('.delete()', () => {
test('can delete an existing block', async () => {
const {remote, caller} = await setup();
const id = genId();
await remote.create(id, []);
const get1 = await caller.call('block.get', {id}, {});
await remote.delete(id);
try {
const get2 = await caller.call('block.get', {id}, {});
throw new Error('not this error');
} catch (err) {
expect((err as Value<any>).data.message).toBe('NOT_FOUND');
}
});
});
8 changes: 4 additions & 4 deletions src/json-crdt-repo/remote/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export interface RemoteHistory<
* @param id ID of the block.
* @param cursor The cursor until which to scan.
*/
scanBwd(id: string, cursor: Cursor): Promise<{snapshot: S; patches: P[]}>;
scanBwd(id: string, cursor: Cursor): Promise<{patches: P[]; snapshot?: S}>;

/**
* Create a new block with the given patches.
Expand All @@ -71,7 +71,7 @@ export interface RemoteHistory<
id: string,
patches: Pick<P, 'blob'>[],
): Promise<{
block: Omit<B, 'data' | 'tip'>;
block: Omit<B, 'snapshot' | 'tip'>;
snapshot: Omit<S, 'blob'>;
patches: Omit<P, 'blob'>[];
}>;
Expand All @@ -83,7 +83,7 @@ export interface RemoteHistory<
* @param cursor The cursor of the last known model state of the block.
* @param patches A list of patches to apply to the block.
*/
update(id: string, cursor: Cursor, patches: Pick<P, 'blob'>[]): Promise<{patches: Omit<P, 'blob'>[]}>;
update(id: string, patches: Pick<P, 'blob'>[]): Promise<{patches: Omit<P, 'blob'>[]}>;

/**
* Delete the block. If not implemented, means that the protocol does not
Expand Down Expand Up @@ -120,7 +120,7 @@ export interface RemoteBlock<Cursor> {
/**
* The latest snapshot of the block.
*/
data: RemoteBlockSnapshot<Cursor>;
snapshot: RemoteBlockSnapshot<Cursor>;

/**
* The latest patches that have been stored, but not yet applied to the the
Expand Down